[jboss-cvs] JBoss Messaging SVN: r5760 - in trunk: src/main/org/jboss/messaging/core/management and 15 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 29 13:24:44 EST 2009
Author: timfox
Date: 2009-01-29 13:24:44 -0500 (Thu, 29 Jan 2009)
New Revision: 5760
Modified:
trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/util/SimpleString.java
trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
Log:
More clustering
Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -68,6 +68,8 @@
public static final SimpleString HDR_ADDRESS = new SimpleString("_JBM_Address");
+ public static final SimpleString HDR_BINDING_ID = new SimpleString("_JBM_Binding_ID");
+
public static final SimpleString HDR_FILTERSTRING = new SimpleString("_JBM_FilterString");
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationType.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationType.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -31,5 +31,5 @@
*/
public enum NotificationType
{
- QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED, CONSUMER_CREATED, CONSUMER_CLOSED;
+ BINDING_ADDED, BINDING_REMOVED, ADDRESS_ADDED, ADDRESS_REMOVED, CONSUMER_CREATED, CONSUMER_CLOSED;
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -84,10 +84,10 @@
{
Bindings bindings = postOffice.getBindingsForAddress(address);
String[] queueNames = new String[bindings.getBindings().size()];
- for (int i = 0; i < bindings.getBindings().size(); i++)
- {
- Binding binding = bindings.getBindings().get(i);
- queueNames[i] = binding.getUniqueName().toString();
+ int i = 0;
+ for (Binding binding: bindings.getBindings())
+ {
+ queueNames[i++] = binding.getUniqueName().toString();
}
return queueNames;
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -284,29 +284,14 @@
if (log.isDebugEnabled())
{
log.debug("registered queue " + objectName);
- }
-
- TypedProperties props = new TypedProperties();
-
- log.info("registering queue with address "+ address);
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, queue.getName());
-
- sendNotification(new Notification(NotificationType.QUEUE_CREATED, props));
+ }
}
public void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
{
ObjectName objectName = getQueueObjectName(address, name);
unregisterResource(objectName);
- messageCounterManager.unregisterMessageCounter(name.toString());
-
- TypedProperties props = new TypedProperties();
-
- props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
- props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, name);
-
- sendNotification(new Notification(NotificationType.QUEUE_DESTROYED, props));
+ messageCounterManager.unregisterMessageCounter(name.toString());
}
public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -67,7 +67,9 @@
public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("_JBM_DUPL_ID");
- public static final SimpleString HDR_EXCLUDED_QUEUES = new SimpleString("_JBM_EXCLUDED_QUEUES");
+ public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_JBM_ROUTE_TO");
+
+ public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("_JBM_FROM_CLUSTER");
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -378,9 +378,7 @@
buff.putLong(msg.getMessageID());
- SimpleString duplID = new SimpleString(bytes);
-
- msg.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
+ msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
}
int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -67,9 +67,9 @@
void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
- void storeDuplicateID(SimpleString address, SimpleString duplID, long recordID) throws Exception;
+ void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
- void updateDuplicateID(SimpleString address, SimpleString duplID, long recordID) throws Exception;
+ void updateDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
void deleteDuplicateID(long recordID) throws Exception;
@@ -83,9 +83,9 @@
void deleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
- void storeDuplicateIDTransactional(long txID, SimpleString address, SimpleString duplID, long recordID) throws Exception;
+ void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
- void updateDuplicateIDTransactional(long txID, SimpleString address, SimpleString duplID, long recordID) throws Exception;
+ void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception;
@@ -106,7 +106,7 @@
HierarchicalRepository<QueueSettings> queueSettingsRepository,
Map<Long, Queue> queues,
ResourceManager resourceManager,
- Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception;
+ Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
// Bindings related operations
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -79,6 +79,7 @@
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
import org.jboss.messaging.core.transaction.Transaction.State;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.DataConstants;
import org.jboss.messaging.util.IDGenerator;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.Pair;
@@ -279,14 +280,14 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), SET_SCHEDULED_DELIVERY_TIME, encoding);
}
- public void storeDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+ public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding);
}
- public void updateDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+ public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
@@ -386,7 +387,7 @@
public void storeDuplicateIDTransactional(final long txID,
final SimpleString address,
- final SimpleString duplID,
+ final byte[] duplID,
final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
@@ -396,7 +397,7 @@
public void updateDuplicateIDTransactional(final long txID,
final SimpleString address,
- final SimpleString duplID,
+ final byte[] duplID,
final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
@@ -438,7 +439,7 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final Map<Long, Queue> queues,
final ResourceManager resourceManager,
- final Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -608,16 +609,16 @@
encoding.decode(buff);
- List<Pair<SimpleString, Long>> ids = duplicateIDMap.get(encoding.address);
+ List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
if (ids == null)
{
- ids = new ArrayList<Pair<SimpleString, Long>>();
+ ids = new ArrayList<Pair<byte[], Long>>();
duplicateIDMap.put(encoding.address, ids);
}
- ids.add(new Pair<SimpleString, Long>(encoding.duplID, record.id));
+ ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
break;
}
@@ -671,7 +672,7 @@
final Map<Long, Queue> queues,
final ResourceManager resourceManager,
final List<PreparedTransactionInfo> preparedTransactions,
- final Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
final PagingManager pagingManager = postOffice.getPagingManager();
@@ -798,16 +799,16 @@
encoding.decode(buff);
- List<Pair<SimpleString, Long>> ids = duplicateIDMap.get(encoding.address);
+ List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
if (ids == null)
{
- ids = new ArrayList<Pair<SimpleString, Long>>();
+ ids = new ArrayList<Pair<byte[], Long>>();
duplicateIDMap.put(encoding.address, ids);
}
- ids.add(new Pair<SimpleString, Long>(encoding.duplID, record.id));
+ ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
break;
}
@@ -1405,9 +1406,9 @@
{
SimpleString address;
- SimpleString duplID;
+ byte[] duplID;
- public DuplicateIDEncoding(final SimpleString address, final SimpleString duplID)
+ public DuplicateIDEncoding(final SimpleString address, final byte[] duplID)
{
this.address = address;
@@ -1422,19 +1423,25 @@
{
address = buffer.getSimpleString();
- duplID = buffer.getSimpleString();
+ int size = buffer.getInt();
+
+ duplID = new byte[size];
+
+ buffer.getBytes(duplID);
}
public void encode(final MessagingBuffer buffer)
{
buffer.putSimpleString(address);
- buffer.putSimpleString(duplID);
+ buffer.putInt(duplID.length);
+
+ buffer.putBytes(duplID);
}
public int getEncodeSize()
{
- return SimpleString.sizeofString(address) + SimpleString.sizeofString(duplID);
+ return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -80,8 +80,7 @@
return true;
}
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
- final List<SimpleString> destinations) throws Exception
+ public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<SimpleString> destinations) throws Exception
{
}
@@ -93,11 +92,11 @@
public void rollback(final long txID) throws Exception
{
}
-
+
public void storeReference(final long queueID, final long messageID) throws Exception
{
}
-
+
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
}
@@ -150,24 +149,24 @@
{
}
- public void storeDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+ public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
}
public void storeDuplicateIDTransactional(final long txID,
final SimpleString address,
- final SimpleString duplID,
+ final byte[] duplID,
final long recordID) throws Exception
{
}
- public void updateDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+ public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
}
public void updateDuplicateIDTransactional(final long txID,
final SimpleString address,
- final SimpleString duplID,
+ final byte[] duplID,
final long recordID) throws Exception
{
}
@@ -214,12 +213,12 @@
{
}
- public void loadMessageJournal(final PostOffice postOffice,
- final StorageManager storageManager,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final Map<Long, Queue> queues,
- final ResourceManager resourceManager,
- final Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception
+ public void loadMessageJournal(PostOffice postOffice,
+ StorageManager storageManager,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ Map<Long, Queue> queues,
+ ResourceManager resourceManager,
+ Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -48,6 +48,13 @@
boolean filterMatches(ServerMessage message) throws Exception;
boolean isHighAcceptPriority(ServerMessage message);
+
+ //TODO find a better way
+ void willRoute(ServerMessage message);
boolean isExclusive();
+
+ int getID();
+
+ void setID(int id);
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -22,7 +22,7 @@
package org.jboss.messaging.core.postoffice;
-import java.util.List;
+import java.util.Collection;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.Transaction;
@@ -38,10 +38,8 @@
*/
public interface Bindings
{
- List<Binding> getBindings();
+ Collection<Binding> getBindings();
- void route(final ServerMessage message) throws Exception;
-
void route(ServerMessage message, Transaction tx) throws Exception;
void addBinding(Binding binding);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -26,7 +26,6 @@
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.SimpleString;
/**
* A DuplicateIDCache
@@ -39,9 +38,9 @@
*/
public interface DuplicateIDCache
{
- boolean contains(SimpleString duplicateID);
+ boolean contains(byte[] duplicateID);
- void addToCache(SimpleString duplicateID, Transaction tx) throws Exception;
+ void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
- void load(List<Pair<SimpleString, Long>> theIds) throws Exception;
+ void load(List<Pair<byte[], Long>> theIds) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -45,14 +45,17 @@
private final SimpleString address;
+ private final int id;
+
private List<SimpleString> filterStrings;
private int numberOfConsumers;
- public QueueInfo(final SimpleString queueName, final SimpleString address)
+ public QueueInfo(final SimpleString queueName, final SimpleString address, final int id)
{
this.queueName = queueName;
this.address = address;
+ this.id = id;
}
public SimpleString getQueueName()
@@ -64,6 +67,11 @@
{
return address;
}
+
+ public int getID()
+ {
+ return id;
+ }
public List<SimpleString> getFilterStrings()
{
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.postoffice.impl;
+import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -31,6 +33,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.server.Bindable;
@@ -55,13 +58,13 @@
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
- private final List<Binding> bindingsList = new CopyOnWriteArrayList<Binding>();
+ private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
- public List<Binding> getBindings()
+ public Collection<Binding> getBindings()
{
- return bindingsList;
+ return bindingsMap.values();
}
public void addBinding(final Binding binding)
@@ -91,7 +94,7 @@
bindings.add(binding);
}
- bindingsList.add(binding);
+ bindingsMap.put(binding.getID(), binding);
}
public void removeBinding(final Binding binding)
@@ -117,15 +120,48 @@
}
}
- bindingsList.remove(binding);
+ bindingsMap.remove(binding.getID());
}
- public void route(ServerMessage message) throws Exception
+ private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
{
- route(message, null);
+ byte[] ids = (byte[])message.getProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ Set<Bindable> chosen = new HashSet<Bindable>();
+
+ while (buff.hasRemaining())
+ {
+ int bindingID = buff.getInt();
+
+ Binding binding = bindingsMap.get(bindingID);
+
+ if (binding == null)
+ {
+ //The binding has been closed - we need to route the message somewhere else...............
+ throw new IllegalStateException("Binding not found when routing from cluster - it must have closed");
+
+ //FIXME need to deal with this better
+ }
+
+ binding.willRoute(message);
+
+ chosen.add(binding.getBindable());
+ }
+
+ for (Bindable bindable : chosen)
+ {
+ bindable.preroute(message, tx);
+ }
+
+ for (Bindable bindable : chosen)
+ {
+ bindable.route(message, tx);
+ }
}
- public void route(ServerMessage message, Transaction tx) throws Exception
+ public void route(final ServerMessage message, final Transaction tx) throws Exception
{
if (!exclusiveBindings.isEmpty())
{
@@ -136,130 +172,137 @@
}
else
{
- Set<Bindable> chosen = new HashSet<Bindable>();
-
- for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+ if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
- SimpleString routingName = entry.getKey();
-
- List<Binding> bindings = entry.getValue();
-
- if (bindings == null)
+ routeFromCluster(message, tx);
+ }
+ else
+ {
+ Set<Bindable> chosen = new HashSet<Bindable>();
+
+ for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
{
- // The value can become null if it's concurrently removed while we're iterating - this is expected
- // ConcurrentHashMap behaviour!
- continue;
- }
-
- Integer ipos = routingNamePositions.get(routingName);
-
- int pos = ipos != null ? ipos.intValue() : 0;
-
- int length = bindings.size();
-
- int startPos = pos;
-
- Binding theBinding = null;
-
- int lastNoMatchingConsumerPos = -1;
-
- while (true)
- {
- Binding binding;
- try
+ SimpleString routingName = entry.getKey();
+
+ List<Binding> bindings = entry.getValue();
+
+ if (bindings == null)
{
- binding = bindings.get(pos);
+ // The value can become null if it's concurrently removed while we're iterating - this is expected
+ // ConcurrentHashMap behaviour!
+ continue;
}
- catch (IndexOutOfBoundsException e)
+
+ Integer ipos = routingNamePositions.get(routingName);
+
+ int pos = ipos != null ? ipos.intValue() : 0;
+
+ int length = bindings.size();
+
+ int startPos = pos;
+
+ Binding theBinding = null;
+
+ int lastNoMatchingConsumerPos = -1;
+
+ while (true)
{
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
+ Binding binding;
+ try
{
- pos = 0;
-
- continue;
+ binding = bindings.get(pos);
}
- else
+ catch (IndexOutOfBoundsException e)
{
- break;
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
+ {
+ pos = 0;
+
+ continue;
+ }
+ else
+ {
+ break;
+ }
}
- }
-
- if (binding.filterMatches(message))
- {
- // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
- // unnecessary overhead)
- if (length == 1 || binding.isHighAcceptPriority(message))
+
+ if (binding.filterMatches(message))
{
- theBinding = binding;
-
- pos = incrementPos(pos, length);
-
- break;
- }
- else
- {
- lastNoMatchingConsumerPos = pos;
- }
- }
-
- pos = incrementPos(pos, length);
-
- if (pos == startPos)
- {
- if (lastNoMatchingConsumerPos != -1)
- {
- try
+ // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
+ // unnecessary overhead)
+ if (length == 1 || binding.isHighAcceptPriority(message))
{
- theBinding = bindings.get(pos);
+ theBinding = binding;
+
+ pos = incrementPos(pos, length);
+
+ break;
}
- catch (IndexOutOfBoundsException e)
+ else
{
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
+ lastNoMatchingConsumerPos = pos;
+ }
+ }
+
+ pos = incrementPos(pos, length);
+
+ if (pos == startPos)
+ {
+ if (lastNoMatchingConsumerPos != -1)
+ {
+ try
{
- pos = 0;
-
- lastNoMatchingConsumerPos = -1;
-
- continue;
+ theBinding = bindings.get(pos);
}
- else
+ catch (IndexOutOfBoundsException e)
{
- break;
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
+ {
+ pos = 0;
+
+ lastNoMatchingConsumerPos = -1;
+
+ continue;
+ }
+ else
+ {
+ break;
+ }
}
+
+ pos = lastNoMatchingConsumerPos;
+
+ pos = incrementPos(pos, length);
}
-
- pos = lastNoMatchingConsumerPos;
-
- pos = incrementPos(pos, length);
+ break;
}
- break;
}
+
+ if (theBinding != null)
+ {
+ theBinding.willRoute(message);
+
+ chosen.add(theBinding.getBindable());
+ }
+
+ routingNamePositions.put(routingName, pos);
}
-
- if (theBinding != null)
+
+ //TODO refactor to do this is one iteration
+
+ for (Bindable bindable : chosen)
{
- chosen.add(theBinding.getBindable());
+ bindable.preroute(message, tx);
}
-
- routingNamePositions.put(routingName, pos);
-
+
+ for (Bindable bindable : chosen)
+ {
+ bindable.route(message, tx);
+ }
}
-
- //TODO refactor to do this is one iteration
-
- for (Bindable bindable : chosen)
- {
- bindable.preroute(message, tx);
- }
-
- for (Bindable bindable : chosen)
- {
- bindable.route(message, tx);
- }
}
-
}
private final int incrementPos(int pos, int length)
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -53,6 +53,8 @@
private final boolean exclusive;
+ private int id;
+
public DivertBinding(final SimpleString address, final Divert divert)
{
this.address = address;
@@ -67,6 +69,17 @@
this.exclusive = divert.isExclusive();
}
+
+ public int getID()
+ {
+ return id;
+ }
+
+ public void setID(final int id)
+ {
+ this.id = id;
+ }
+
public boolean filterMatches(final ServerMessage message) throws Exception
{
@@ -109,6 +122,10 @@
{
return true;
}
+
+ public void willRoute(final ServerMessage message)
+ {
+ }
public boolean isQueueBinding()
{
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -50,89 +50,57 @@
public class DuplicateIDCacheImpl implements DuplicateIDCache
{
private static final Logger log = Logger.getLogger(DuplicateIDCacheImpl.class);
-
- public static volatile boolean debug;
-
- private static Set<DuplicateIDCacheImpl> caches = new ConcurrentHashSet<DuplicateIDCacheImpl>();
- public static void dumpCaches()
- {
- for (DuplicateIDCacheImpl cache : caches)
- {
- log.info("Dumping cache for address: " + cache.address);
- log.info("First the set:");
- for (SimpleString duplID : cache.cache)
- {
- log.info(duplID);
- }
- log.info("End set");
- log.info("Now the list:");
- for (Pair<SimpleString, Long> id : cache.ids)
- {
- log.info(id.a + ":" + id.b);
- }
- log.info("End dump");
- }
- }
+ private final Set<ByteArrayHolder> cache = new ConcurrentHashSet<ByteArrayHolder>();
- private final Set<SimpleString> cache = new ConcurrentHashSet<SimpleString>();
-
private final SimpleString address;
// Note - deliberately typed as ArrayList since we want to ensure fast indexed
// based array access
- private final ArrayList<Pair<SimpleString, Long>> ids;
+ private final ArrayList<Pair<ByteArrayHolder, Long>> ids;
private int pos;
private int cacheSize;
private final StorageManager storageManager;
-
+
private final boolean persist;
+
- public DuplicateIDCacheImpl(final SimpleString address, final int size, final StorageManager storageManager,
+ public DuplicateIDCacheImpl(final SimpleString address,
+ final int size,
+ final StorageManager storageManager,
final boolean persist)
{
this.address = address;
this.cacheSize = size;
- this.ids = new ArrayList<Pair<SimpleString, Long>>(size);
+ this.ids = new ArrayList<Pair<ByteArrayHolder, Long>>(size);
this.storageManager = storageManager;
-
+
this.persist = persist;
-
- if (debug)
- {
- caches.add(this);
- }
}
- protected void finalize() throws Throwable
+ public void load(final List<Pair<byte[], Long>> theIds) throws Exception
{
- if (debug)
- {
- caches.remove(this);
- }
-
- super.finalize();
- }
-
- public void load(final List<Pair<SimpleString, Long>> theIds) throws Exception
- {
int count = 0;
long txID = -1;
-
- for (Pair<SimpleString, Long> id : theIds)
+
+ for (Pair<byte[], Long> id : theIds)
{
if (count < cacheSize)
{
- cache.add(id.a);
+ ByteArrayHolder bah = new ByteArrayHolder(id.a);
- ids.add(id);
+ Pair<ByteArrayHolder, Long> pair = new Pair<ByteArrayHolder, Long>(bah, id.b);
+
+ cache.add(bah);
+
+ ids.add(pair);
}
else
{
@@ -156,15 +124,15 @@
pos = theIds.size();
}
- public boolean contains(final SimpleString duplID)
+ public boolean contains(final byte[] duplID)
{
- return cache.contains(duplID);
+ return cache.contains(new ByteArrayHolder(duplID));
}
-
- public synchronized void addToCache(final SimpleString duplID, final Transaction tx) throws Exception
+
+ public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception
{
long recordID = storageManager.generateUniqueID();
-
+
if (tx == null)
{
if (persist)
@@ -175,25 +143,25 @@
addToCacheInMemory(duplID, recordID);
}
else
- {
+ {
if (persist)
{
storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
-
+
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
-
+
// For a tx, it's important that the entry is not added to the cache until commit (or prepare)
// since if the client fails then resends them tx we don't want it to get rejected
- tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
+ tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
}
}
- private void addToCacheInMemory(final SimpleString duplID, final long recordID) throws Exception
+ private void addToCacheInMemory(final byte[] duplID, final long recordID) throws Exception
{
- cache.add(duplID);
+ cache.add(new ByteArrayHolder(duplID));
- Pair<SimpleString, Long> id;
+ Pair<ByteArrayHolder, Long> id;
if (pos < ids.size())
{
@@ -205,15 +173,15 @@
// Record already exists - we delete the old one and add the new one
// Note we can't use update since journal update doesn't let older records get
// reclaimed
- id.a = duplID;
-
+ id.a = new ByteArrayHolder(duplID);
+
storageManager.deleteDuplicateID(id.b);
id.b = recordID;
}
else
{
- id = new Pair<SimpleString, Long>(duplID, recordID);
+ id = new Pair<ByteArrayHolder, Long>(new ByteArrayHolder(duplID), recordID);
ids.add(id);
}
@@ -225,14 +193,14 @@
}
private class AddDuplicateIDOperation implements TransactionOperation
- {
- final SimpleString duplID;
+ {
+ final byte[] duplID;
final long recordID;
volatile boolean done;
- AddDuplicateIDOperation(final SimpleString duplID, final long recordID)
+ AddDuplicateIDOperation(final byte[] duplID, final long recordID)
{
this.duplID = duplID;
@@ -248,7 +216,7 @@
done = true;
}
}
-
+
public void beforeCommit(final Transaction tx) throws Exception
{
}
@@ -276,4 +244,56 @@
}
}
+
+ private static final class ByteArrayHolder
+ {
+ ByteArrayHolder(final byte[] bytes)
+ {
+ this.bytes = bytes;
+ }
+
+ final byte[] bytes;
+
+ int hash;
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof ByteArrayHolder)
+ {
+ ByteArrayHolder s = (ByteArrayHolder)other;
+
+ if (bytes.length != s.bytes.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < bytes.length; i++)
+ {
+ if (bytes[i] != s.bytes[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ if (hash == 0)
+ {
+ for (int i = 0; i < bytes.length; i++)
+ {
+ hash = 31 * hash + bytes[i];
+ }
+ }
+
+ return hash;
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -52,6 +52,8 @@
private final SimpleString name;
+ private int id;
+
public LocalQueueBinding(final SimpleString address, final Queue queue)
{
this.address = address;
@@ -62,6 +64,17 @@
this.name = queue.getName();
}
+
+ public int getID()
+ {
+ return id;
+ }
+
+ public void setID(final int id)
+ {
+ this.id = id;
+ }
+
public boolean filterMatches(final ServerMessage message) throws Exception
{
@@ -125,6 +138,12 @@
return false;
}
+
+ public void willRoute(final ServerMessage message)
+ {
+ }
+
+
public boolean isQueueBinding()
{
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -114,7 +114,19 @@
private final int idCacheSize;
private final boolean persistIDCache;
-
+
+ //Each queue has a transient ID which lasts the lifetime of its binding. This is used in clustering when routing messages to particular queues on nodes. We could
+ //use the queue name on the node to identify it. But sometimes we need to route to maybe 10s of thousands of queues on a particular node, and all would
+ //have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
+ //The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
+ private int transientIDSequence;
+
+ private Set<Integer> transientIDs = new HashSet<Integer>();
+
+ private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
+
+ private final Object notificationLock = new Object();
+
public PostOfficeImpl(final StorageManager storageManager,
final PagingManager pagingManager,
final QueueFactory bindableFactory,
@@ -215,17 +227,13 @@
// NotificationListener implementation -------------------------------------
- private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
-
- private final Object notificationLock = new Object();
-
public void onNotification(final Notification notification)
{
synchronized (notificationLock)
{
NotificationType type = notification.getType();
- if (type == NotificationType.QUEUE_CREATED)
+ if (type == NotificationType.BINDING_ADDED)
{
TypedProperties props = notification.getProperties();
@@ -233,11 +241,13 @@
SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
- QueueInfo info = new QueueInfo(queueName, address);
+ Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+ QueueInfo info = new QueueInfo(queueName, address, transientID);
+
queueInfos.put(queueName, info);
}
- else if (type == NotificationType.QUEUE_DESTROYED)
+ else if (type == NotificationType.BINDING_REMOVED)
{
TypedProperties props = notification.getProperties();
@@ -294,87 +304,8 @@
}
}
}
-
- private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
- {
- ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
- message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
-
- message.setDestination(queueName);
-
- message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
- message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-
- return message;
- }
-
- public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
- {
- //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
- //that queue infos and notifications are received in a contiguous consistent stream
- Binding binding = addressManager.getBinding(queueName);
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find queue " + queueName);
- }
-
- Queue queue = (Queue)binding.getBindable();
-
- //Need to lock to make sure all queue info and notifications are in the correct order with no gaps
- synchronized (notificationLock)
- {
- //First send a reset message
-
- ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
- message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
- message.setDestination(queueName);
- message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
-
- queue.preroute(message, null);
- queue.route(message, null);
-
- for (QueueInfo info: queueInfos.values())
- {
- log.info("creatign queue created message");
- message = createQueueInfoMessage(NotificationType.QUEUE_CREATED, queueName);
-
- message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
- message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
-
- queue.preroute(message, null);
- queue.route(message, null);
-
- int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
-
- for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
- {
- message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
-
- message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
-
- queue.preroute(message, null);
- queue.route(message, null);
- }
-
- if (info.getFilterStrings() != null)
- {
- for (SimpleString filterString: info.getFilterStrings())
- {
- message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
-
- message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
- message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
-
- queue.preroute(message, null);
- queue.route(message, null);
- }
- }
- }
- }
-
- }
+
// PostOffice implementation -----------------------------------------------
public synchronized boolean addDestination(final SimpleString address, final boolean durable) throws Exception
@@ -429,21 +360,40 @@
// even though failover is complete
public synchronized void addBinding(final Binding binding) throws Exception
{
+ binding.setID(generateTransientID());
+
addBindingInMemory(binding);
+
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+ props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, binding.getRoutingName());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+
+ managementService.sendNotification(new Notification(NotificationType.BINDING_ADDED, props));
}
public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
{
Binding binding = removeBindingInMemory(uniqueName);
-
+
if (binding.isQueueBinding())
{
managementService.unregisterQueue(uniqueName, binding.getAddress());
}
+
+ TypedProperties props = new TypedProperties();
+
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+ props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, binding.getRoutingName());
+ managementService.sendNotification(new Notification(NotificationType.BINDING_REMOVED, props));
+
+ releaseTransientID(binding.getID());
+
return binding;
}
-
+
public Bindings getBindingsForAddress(final SimpleString address)
{
Bindings bindings = addressManager.getBindings(address);
@@ -474,7 +424,7 @@
}
}
- SimpleString duplicateID = (SimpleString)message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+ byte[] duplicateID = (byte[])message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
@@ -620,8 +570,115 @@
return cache;
}
+
+
+ public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+ {
+ //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
+ //that queue infos and notifications are received in a contiguous consistent stream
+ Binding binding = addressManager.getBinding(queueName);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find queue " + queueName);
+ }
+
+ Queue queue = (Queue)binding.getBindable();
+
+ //Need to lock to make sure all queue info and notifications are in the correct order with no gaps
+ synchronized (notificationLock)
+ {
+ //First send a reset message
+
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+ message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+ message.setDestination(queueName);
+ message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
+
+ queue.preroute(message, null);
+ queue.route(message, null);
+
+ for (QueueInfo info: queueInfos.values())
+ {
+ message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+ message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+
+ queue.preroute(message, null);
+ queue.route(message, null);
+
+ int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+
+ for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+ {
+ message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+
+ queue.preroute(message, null);
+ queue.route(message, null);
+ }
+
+ if (info.getFilterStrings() != null)
+ {
+ for (SimpleString filterString: info.getFilterStrings())
+ {
+ message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+ message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+
+ queue.preroute(message, null);
+ queue.route(message, null);
+ }
+ }
+ }
+ }
+
+ }
+
// Private -----------------------------------------------------------------
+ private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
+ {
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+ message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+
+ message.setDestination(queueName);
+
+ message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
+ message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+ return message;
+ }
+
+
+ private int generateTransientID()
+ {
+ int start = transientIDSequence;
+ do
+ {
+ int id = transientIDSequence++;
+
+ if (!transientIDs.contains(id))
+ {
+ transientIDs.add(id);
+
+ return id;
+ }
+ }
+ while (transientIDSequence != start);
+
+ throw new IllegalStateException("Run out of queue ids!");
+ }
+
+ private void releaseTransientID(final int id)
+ {
+ transientIDs.remove(id);
+ }
+
private final PageMessageOperation getPageOperation(final Transaction tx)
{
PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
@@ -657,7 +714,7 @@
}
managementService.registerQueue(queue, binding.getAddress(), storageManager);
- }
+ }
}
private Binding removeBindingInMemory(final SimpleString bindingName) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -21,16 +21,17 @@
*/
package org.jboss.messaging.core.postoffice.impl;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.postoffice.Address;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.util.SimpleString;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* extends the simple manager to allow wildcard addresses to be used.
*
@@ -72,7 +73,7 @@
Bindings b = super.getBindings(destAdd.getAddress());
if (b != null)
{
- List<Binding> theBindings = b.getBindings();
+ Collection<Binding> theBindings = b.getBindings();
for (Binding theBinding : theBindings)
{
super.addMappingInternal(address, theBinding);
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -47,7 +47,7 @@
long getPersistenceID();
void setPersistenceID(long id);
-
+
Filter getFilter();
boolean isDurable();
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -48,6 +48,8 @@
int decrementRefCount();
ServerMessage copy(long newID) throws Exception;
+
+ ServerMessage copy() throws Exception;
int getMemoryEstimate();
@@ -58,4 +60,6 @@
boolean isStored();
int getRefCount();
+
+ //TODO - we might be able to put this in a better place
}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -24,7 +24,9 @@
import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
+import java.util.HashSet;
import java.util.LinkedList;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -47,6 +49,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -135,9 +138,13 @@
private final int maxRetriesAfterFailover;
private final MessageHandler queueInfoMessageHandler;
-
+
private final String queueDataAddress;
+ private final SimpleString idsHeaderName;
+
+ private final boolean forClusterConnector;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -161,7 +168,8 @@
final int maxRetriesAfterFailover,
final boolean useDuplicateDetection,
final MessageHandler queueInfoMessageHandler,
- final String queueDataAddress) throws Exception
+ final String queueDataAddress,
+ final boolean forClusterConnector) throws Exception
{
log.info("Creating new bridge " + name + " queue " + queue);
this.name = name;
@@ -204,11 +212,15 @@
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
this.queueInfoMessageHandler = queueInfoMessageHandler;
-
+
log.info("queue info handler " + this.queueInfoMessageHandler);
-
+
this.queueDataAddress = queueDataAddress;
+ this.forClusterConnector = forClusterConnector;
+
+ this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+
if (maxBatchTime != -1)
{
future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(),
@@ -258,28 +270,34 @@
session.addFailureListener(BridgeImpl.this);
+ // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
+ // connection is opened and closed - we can use
+ // a callback to tell us that
if (queueInfoMessageHandler != null)
{
// Get the queue data
- SimpleString notifQueueName = new SimpleString("notif-").concat(UUIDGenerator.getInstance().generateSimpleStringUUID());
+ SimpleString notifQueueName = new SimpleString("notif-").concat(UUIDGenerator.getInstance()
+ .generateSimpleStringUUID());
- SimpleString filter = new SimpleString(
- ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
+ SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
"'" +
- NotificationType.QUEUE_CREATED +
+ NotificationType.BINDING_ADDED +
"'," +
"'" +
- NotificationType.QUEUE_DESTROYED +
+ NotificationType.BINDING_REMOVED +
"'," +
"'" +
NotificationType.CONSUMER_CREATED +
"'," +
"'" +
NotificationType.CONSUMER_CLOSED +
- "') AND " +
- ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "%'");
-
+ "') AND " +
+ ManagementHelper.HDR_ADDRESS +
+ " LIKE '" +
+ queueDataAddress +
+ "%'");
+
session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
ClientConsumer notifConsumer = session.createConsumer(notifQueueName);
@@ -294,12 +312,12 @@
ManagementServiceImpl.getMessagingServerObjectName(),
"sendQueueInfoToQueue",
notifQueueName.toString());
-
+
ClientProducer prod = session.createProducer(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
-
+
prod.send(message);
}
-
+
log.info("Created objects");
active = true;
@@ -323,7 +341,7 @@
{
return;
}
-
+
started = false;
active = false;
@@ -476,10 +494,10 @@
{
return;
}
-
+
log.info("sending batch");
- // TODO - if batch size = 1 then don't need tx
+ // TODO - if batch size = 1 then don't need tx - actually we should use asynch send acknowledgement stream - then we don't need a transaction at all
while (true)
{
@@ -493,6 +511,32 @@
ref.getQueue().acknowledge(tx, ref);
ServerMessage message = ref.getMessage();
+
+ if (this.forClusterConnector)
+ {
+ //We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave only
+ //the one pertinent for the destination node - this is important since different queues on different nodes could have same queue ids
+ //Note we must copy since same message may get routed to other nodes which require different headers
+ message = message.copy();
+
+ //TODO - we can optimise this
+
+ Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+
+ byte[] queueIds = (byte[])message.getProperty(idsHeaderName);
+
+ for (SimpleString propName: propNames)
+ {
+ if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+ {
+ message.removeProperty(propName);
+ }
+ }
+
+ message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+
+ message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
+ }
if (transformer != null)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -310,9 +310,10 @@
bridgeConfig.getRetryIntervalMultiplier(),
bridgeConfig.getMaxRetriesBeforeFailover(),
bridgeConfig.getMaxRetriesAfterFailover(),
- false,
+ false, // Duplicate detection is handled in the RemoteQueueBindingImpl
record,
- address.toString());
+ address.toString(),
+ true);
record.setBridge(bridge);
@@ -438,31 +439,34 @@
log.info("Got notification message " + type);
- if (type == NotificationType.QUEUE_CREATED)
+ if (type == NotificationType.BINDING_ADDED)
{
log.info("queue created");
- SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
- .generateSimpleStringUUID());
+ SimpleString uniqueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+
+ Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
uniqueName,
queueName,
+ queueID,
filterString,
queue,
useDuplicateDetection,
- forwardWhenNoMatchingConsumers);
+ forwardWhenNoMatchingConsumers,
+ bridge.getName());
bindings.put(queueName, binding);
postOffice.addBinding(binding);
}
- else if (type == NotificationType.QUEUE_DESTROYED)
+ else if (type == NotificationType.BINDING_REMOVED)
{
log.info("queue destroyed");
SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -399,7 +399,8 @@
config.getMaxRetriesAfterFailover(),
config.isUseDuplicateDetection(),
null,
- null);
+ null,
+ false);
bridges.put(config.getName(), bridge);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.server.cluster.impl;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -30,11 +31,13 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
/**
* A RemoteQueueBindingImpl
@@ -56,6 +59,8 @@
private final SimpleString uniqueName;
private final SimpleString routingName;
+
+ private final int remoteQueueID;
private final Filter queueFilter;
@@ -68,14 +73,20 @@
private final boolean duplicateDetection;
private final boolean forwardWhenNoMatchingConsumers;
-
+
+ private final SimpleString idsHeaderName;
+
+ private int id;
+
public RemoteQueueBindingImpl(final SimpleString address,
final SimpleString uniqueName,
final SimpleString routingName,
+ final int remoteQueueID,
final SimpleString filterString,
final Queue storeAndForwardQueue,
final boolean duplicateDetection,
- final boolean forwardWhenNoMatchingConsumers) throws Exception
+ final boolean forwardWhenNoMatchingConsumers,
+ final SimpleString bridgeName) throws Exception
{
this.address = address;
@@ -84,6 +95,8 @@
this.uniqueName = uniqueName;
this.routingName = routingName;
+
+ this.remoteQueueID = remoteQueueID;
this.duplicateDetection = duplicateDetection;
@@ -97,8 +110,20 @@
}
this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+
+ this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
}
-
+
+ public int getID()
+ {
+ return id;
+ }
+
+ public void setID(final int id)
+ {
+ this.id = id;
+ }
+
public SimpleString getAddress()
{
return address;
@@ -170,6 +195,43 @@
return false;
}
+
+ public void willRoute(final ServerMessage message)
+ {
+ //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
+
+ //TODO - this can be optimised
+
+ byte[] ids = (byte[])message.getProperty(idsHeaderName);
+
+ if (ids == null)
+ {
+ ids = new byte[4];
+ }
+ else
+ {
+ byte[] newIds = new byte[ids.length + 4];
+
+ System.arraycopy(ids, 0, newIds, 4, ids.length);
+
+ ids = newIds;
+ }
+
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ buff.putInt(remoteQueueID);
+
+ message.putBytesProperty(idsHeaderName, ids);
+
+ //Now add a duplicate detection header, if required.
+ //We use a GUID for this
+ if (duplicateDetection)
+ {
+ byte[] guid = UUIDGenerator.getInstance().generateUUID().asBytes();
+
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, guid);
+ }
+ }
public synchronized void addConsumer(final SimpleString filterString) throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -296,7 +296,7 @@
postOffice.addBinding(binding);
}
- Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
+ Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
storageManager.loadMessageJournal(postOffice,
@@ -306,7 +306,7 @@
resourceManager,
duplicateIDMap);
- for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry : duplicateIDMap.entrySet())
+ for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
{
SimpleString address = entry.getKey();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -51,7 +51,7 @@
private PostOffice postOffice;
private final StorageManager storageManager;
-
+
public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final StorageManager storageManager)
@@ -75,7 +75,7 @@
final boolean temporary)
{
QueueSettings queueSettings = queueSettingsRepository.getMatch(name.toString());
-
+
Queue queue = new QueueImpl(persistenceID,
name,
filter,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -73,7 +73,7 @@
public static final int NUM_PRIORITIES = 10;
private volatile long persistenceID = -1;
-
+
private final SimpleString name;
private volatile Filter filter;
@@ -116,9 +116,7 @@
private int consumersToFailover = -1;
- // private final SimpleString routeToPropertyName;
-
- public QueueImpl(final long persistenceID,
+ public QueueImpl(final long persistenceID,
final SimpleString name,
final Filter filter,
final boolean durable,
@@ -129,7 +127,7 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository)
{
this.persistenceID = persistenceID;
-
+
this.name = name;
this.filter = filter;
@@ -329,7 +327,7 @@
{
persistenceID = id;
}
-
+
public Filter getFilter()
{
return filter;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -160,6 +160,13 @@
return m;
}
+
+ public ServerMessage copy() throws Exception
+ {
+ ServerMessage m = new ServerMessageImpl(this);
+
+ return m;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -1498,7 +1498,7 @@
{
storageManager.addQueueBinding(binding);
}
-
+
postOffice.addBinding(binding);
if (temporary)
Modified: trunk/src/main/org/jboss/messaging/util/SimpleString.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleString.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/util/SimpleString.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.util;
@@ -30,7 +30,6 @@
import org.jboss.messaging.core.logging.Logger;
-
/**
*
* A SimpleString
@@ -44,220 +43,219 @@
public class SimpleString implements CharSequence, Serializable, Comparable<SimpleString>
{
private static final long serialVersionUID = 4204223851422244307L;
-
+
private static final Logger log = Logger.getLogger(SimpleString.class);
-
// Attributes
- // ------------------------------------------------------------------------
- private final byte[] data;
-
- private transient int hash;
-
- //Cache the string
- private transient String str;
-
+ // ------------------------------------------------------------------------
+ private final byte[] data;
+
+ private transient int hash;
+
+ // Cache the string
+ private transient String str;
+
// Static
- // ----------------------------------------------------------------------
-
- /**
- * Returns a SimpleString constructed from the <code>string</code> parameter.
- * If <code>string</code> is <code>null</code>, the return value will be <code>null</code> too.
- */
- public static SimpleString toSimpleString(final String string)
- {
- if (string == null)
- {
- return null;
- }
- return new SimpleString(string);
- }
+ // ----------------------------------------------------------------------
- // Constructors
- // ----------------------------------------------------------------------
-
- public SimpleString(final String string)
- {
- int len = string.length();
-
- data = new byte[len << 1];
-
- int j = 0;
-
- for (int i = 0; i < len; i++)
- {
- char c = string.charAt(i);
-
- byte low = (byte)(c & 0xFF); // low byte
-
- data[j++] = low;
-
- byte high = (byte)(c >> 8 & 0xFF); // high byte
-
- data[j++] = high;
- }
-
- str = string;
- }
-
- public SimpleString(final byte[] data)
- {
- this.data = data;
- }
-
- // CharSequence implementation
- // ---------------------------------------------------------------------------
-
- public int length()
- {
- return data.length >> 1;
- }
-
- public char charAt(int pos)
- {
- if (pos < 0 || pos >= data.length >> 1)
- {
- throw new IndexOutOfBoundsException();
- }
- pos <<= 1;
-
- return (char)(data[pos] | data[pos + 1] << 8);
- }
-
- public CharSequence subSequence(final int start, final int end)
- {
- int len = data.length >> 1;
+ /**
+ * Returns a SimpleString constructed from the <code>string</code> parameter.
+ * If <code>string</code> is <code>null</code>, the return value will be <code>null</code> too.
+ */
+ public static SimpleString toSimpleString(final String string)
+ {
+ if (string == null)
+ {
+ return null;
+ }
+ return new SimpleString(string);
+ }
- if (end < start || start < 0 || end > len)
- {
- throw new IndexOutOfBoundsException();
- }
- else
- {
- int newlen = (end - start) << 1;
- byte[] bytes = new byte[newlen];
-
- System.arraycopy(data, start << 1, bytes, 0, newlen);
-
- return new SimpleString(bytes);
- }
- }
-
- // Comparable implementation -------------------------------------
-
+ // Constructors
+ // ----------------------------------------------------------------------
+
+ public SimpleString(final String string)
+ {
+ int len = string.length();
+
+ data = new byte[len << 1];
+
+ int j = 0;
+
+ for (int i = 0; i < len; i++)
+ {
+ char c = string.charAt(i);
+
+ byte low = (byte)(c & 0xFF); // low byte
+
+ data[j++] = low;
+
+ byte high = (byte)(c >> 8 & 0xFF); // high byte
+
+ data[j++] = high;
+ }
+
+ str = string;
+ }
+
+ public SimpleString(final byte[] data)
+ {
+ this.data = data;
+ }
+
+ // CharSequence implementation
+ // ---------------------------------------------------------------------------
+
+ public int length()
+ {
+ return data.length >> 1;
+ }
+
+ public char charAt(int pos)
+ {
+ if (pos < 0 || pos >= data.length >> 1)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ pos <<= 1;
+
+ return (char)(data[pos] | data[pos + 1] << 8);
+ }
+
+ public CharSequence subSequence(final int start, final int end)
+ {
+ int len = data.length >> 1;
+
+ if (end < start || start < 0 || end > len)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else
+ {
+ int newlen = (end - start) << 1;
+ byte[] bytes = new byte[newlen];
+
+ System.arraycopy(data, start << 1, bytes, 0, newlen);
+
+ return new SimpleString(bytes);
+ }
+ }
+
+ // Comparable implementation -------------------------------------
+
public int compareTo(SimpleString o)
{
return toString().compareTo(o.toString());
}
-
- // Public
- // ---------------------------------------------------------------------------
-
- public byte[] getData()
- {
- return data;
- }
-
- public boolean startsWith(final SimpleString other)
- {
- byte[] otherdata = other.data;
-
- if (otherdata.length > this.data.length)
- {
- return false;
- }
-
- for (int i = 0; i < otherdata.length; i++)
- {
- if (this.data[i] != otherdata[i])
- {
- return false;
- }
- }
-
- return true;
- }
-
- public String toString()
- {
- if (str == null)
- {
- int len = data.length >> 1;
-
- char[] chars = new char[len];
-
- int j = 0;
-
- for (int i = 0; i < len; i++)
- {
- int low = data[j++] & 0xFF;
-
- int high = (data[j++] << 8) & 0xFF00 ;
-
- chars[i] = (char)(low | high);
- }
-
- str = new String(chars);
- }
-
- return str;
- }
-
- public boolean equals(Object other)
- {
- if (other instanceof SimpleString)
- {
- SimpleString s = (SimpleString)other;
-
- if (data.length != s.data.length)
- {
- return false;
- }
-
- for (int i = 0; i < data.length; i++)
- {
- if (data[i] != s.data[i])
- {
- return false;
- }
- }
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public int hashCode()
- {
- if (hash == 0)
- {
- for (int i = 0; i < data.length; i++)
- {
+
+ // Public
+ // ---------------------------------------------------------------------------
+
+ public byte[] getData()
+ {
+ return data;
+ }
+
+ public boolean startsWith(final SimpleString other)
+ {
+ byte[] otherdata = other.data;
+
+ if (otherdata.length > this.data.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < otherdata.length; i++)
+ {
+ if (this.data[i] != otherdata[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public String toString()
+ {
+ if (str == null)
+ {
+ int len = data.length >> 1;
+
+ char[] chars = new char[len];
+
+ int j = 0;
+
+ for (int i = 0; i < len; i++)
+ {
+ int low = data[j++] & 0xFF;
+
+ int high = (data[j++] << 8) & 0xFF00;
+
+ chars[i] = (char)(low | high);
+ }
+
+ str = new String(chars);
+ }
+
+ return str;
+ }
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof SimpleString)
+ {
+ SimpleString s = (SimpleString)other;
+
+ if (data.length != s.data.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < data.length; i++)
+ {
+ if (data[i] != s.data[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ if (hash == 0)
+ {
+ for (int i = 0; i < data.length; i++)
+ {
hash = 31 * hash + data[i];
- }
- }
-
- return hash;
- }
+ }
+ }
+ return hash;
+ }
+
public SimpleString[] split(char delim)
{
- if(!contains(delim))
+ if (!contains(delim))
{
- return new SimpleString[]{this};
+ return new SimpleString[] { this };
}
else
{
List<SimpleString> all = new ArrayList<SimpleString>();
int lasPos = 0;
- for (int i = 0; i < data.length; i+=2)
+ for (int i = 0; i < data.length; i += 2)
{
- byte low = (byte)(delim & 0xFF); // low byte
- byte high = (byte)(delim >> 8 & 0xFF); // high byte
- if (data[i] == low && data[i+1] == high)
+ byte low = (byte)(delim & 0xFF); // low byte
+ byte high = (byte)(delim >> 8 & 0xFF); // high byte
+ if (data[i] == low && data[i + 1] == high)
{
byte[] bytes = new byte[i - lasPos];
System.arraycopy(data, lasPos, bytes, 0, bytes.length);
@@ -273,14 +271,13 @@
}
}
-
public boolean contains(char c)
{
- for (int i = 0; i < data.length; i+=2)
+ for (int i = 0; i < data.length; i += 2)
{
- byte low = (byte)(c & 0xFF); // low byte
- byte high = (byte)(c >> 8 & 0xFF); // high byte
- if (data[i] == low && data[i+1] == high)
+ byte low = (byte)(c & 0xFF); // low byte
+ byte high = (byte)(c >> 8 & 0xFF); // high byte
+ if (data[i] == low && data[i + 1] == high)
{
return true;
}
@@ -288,7 +285,7 @@
return false;
}
- public SimpleString concat(SimpleString toAdd)
+ public SimpleString concat(final SimpleString toAdd)
{
byte[] bytes = new byte[data.length + toAdd.getData().length];
System.arraycopy(data, 0, bytes, 0, data.length);
@@ -296,20 +293,20 @@
return new SimpleString(bytes);
}
- public SimpleString concat(char c)
+ public SimpleString concat(final char c)
{
byte[] bytes = new byte[data.length + 2];
System.arraycopy(data, 0, bytes, 0, data.length);
bytes[data.length] = (byte)(c & 0xFF);
- bytes[data.length+1] = (byte)(c >> 8 & 0xFF);
+ bytes[data.length + 1] = (byte)(c >> 8 & 0xFF);
return new SimpleString(bytes);
}
public static int sizeofString(final SimpleString str)
- {
- return SIZE_INT + str.data.length;
- }
-
+ {
+ return SIZE_INT + str.data.length;
+ }
+
public static int sizeofNullableString(final SimpleString str)
{
if (str == null)
@@ -322,28 +319,31 @@
}
}
- public void getChars(int srcBegin, int srcEnd, char dst[], int dstBegin) {
- if (srcBegin < 0) {
- throw new StringIndexOutOfBoundsException(srcBegin);
+ public void getChars(int srcBegin, int srcEnd, char dst[], int dstBegin)
+ {
+ if (srcBegin < 0)
+ {
+ throw new StringIndexOutOfBoundsException(srcBegin);
}
- if (srcEnd > length()) {
- throw new StringIndexOutOfBoundsException(srcEnd);
+ if (srcEnd > length())
+ {
+ throw new StringIndexOutOfBoundsException(srcEnd);
}
- if (srcBegin > srcEnd) {
- throw new StringIndexOutOfBoundsException(srcEnd - srcBegin);
+ if (srcBegin > srcEnd)
+ {
+ throw new StringIndexOutOfBoundsException(srcEnd - srcBegin);
}
-
+
int j = 0;
- for (int i = srcBegin; i < srcEnd - srcBegin; i++)
+ for (int i = srcBegin; i < srcEnd - srcBegin; i++)
{
int low = data[j++] & 0xFF;
-
- int high = (data[j++] << 8) & 0xFF00 ;
-
+
+ int high = (data[j++] << 8) & 0xFF00;
+
dst[i] = (char)(low | high);
}
- }
+ }
-
}
\ No newline at end of file
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -85,19 +85,19 @@
message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message2 = consumer.receive(1000);
assertEquals(1, message2.getProperty(propKey));
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message2 = consumer.receive(250);
assertNull(message2);
message = createMessage(session, 3);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message2 = consumer.receive(250);
assertNull(message2);
@@ -106,19 +106,19 @@
message = createMessage(session, 4);
SimpleString dupID2 = new SimpleString("hijklmnop");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
message2 = consumer.receive(1000);
assertEquals(4, message2.getProperty(propKey));
message = createMessage(session, 5);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
message2 = consumer.receive(1000);
assertNull(message2);
message = createMessage(session, 6);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message2 = consumer.receive(250);
assertNull(message2);
@@ -163,7 +163,7 @@
ClientMessage message = createMessage(session, i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer1.send(message);
producer2.send(message);
@@ -183,8 +183,6 @@
assertEquals(i, message.getProperty(propKey));
}
- DuplicateIDCacheImpl.dumpCaches();
-
log.info("Now sending more");
for (int i = 0; i < cacheSize; i++)
{
@@ -192,7 +190,7 @@
ClientMessage message = createMessage(session, i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer1.send(message);
producer2.send(message);
@@ -212,7 +210,7 @@
message = createMessage(session, i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer1.send(message);
producer2.send(message);
@@ -238,7 +236,7 @@
message = createMessage(session, i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer1.send(message);
producer2.send(message);
@@ -260,7 +258,7 @@
message = createMessage(session, i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer1.send(message);
producer2.send(message);
@@ -301,7 +299,7 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.close();
@@ -317,7 +315,7 @@
// Should be able to resend it and not get rejected since transaction didn't commit
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
@@ -351,7 +349,7 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.rollback();
@@ -359,7 +357,7 @@
// Should be able to resend it and not get rejected since transaction didn't commit
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
@@ -393,12 +391,12 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID1 = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1.getData());
producer.send(message);
message = createMessage(session, 1);
SimpleString dupID2 = new SimpleString("hijklmno");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.commit();
@@ -406,11 +404,11 @@
// These next two should get rejected
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1.getData());
producer.send(message);
message = createMessage(session, 3);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.commit();
@@ -449,7 +447,7 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
@@ -463,7 +461,7 @@
producer = session.createProducer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
@@ -510,7 +508,7 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -532,7 +530,7 @@
// Should be able to resend it and not get rejected since transaction didn't commit
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
@@ -585,7 +583,7 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -609,7 +607,7 @@
// Should be able to resend it and not get rejected since transaction didn't commit
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
@@ -662,7 +660,7 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -686,7 +684,7 @@
// Should NOT be able to resend it
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
@@ -738,7 +736,7 @@
ClientMessage message = createMessage(session, 0);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -764,7 +762,7 @@
// Should NOT be able to resend it
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
@@ -833,14 +831,14 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
assertEquals(1, message2.getProperty(propKey));
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
message2 = consumer.receive(1000);
assertEquals(2, message2.getProperty(propKey));
@@ -868,13 +866,13 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message2 = consumer.receive(200);
assertNull(message2);
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
message2 = consumer.receive(200);
assertNull(message2);
@@ -918,7 +916,7 @@
{
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
assertEquals(i, message2.getProperty(propKey));
@@ -950,7 +948,7 @@
{
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(100);
assertNull(message2);
@@ -996,7 +994,7 @@
{
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
assertEquals(i, message2.getProperty(propKey));
@@ -1030,7 +1028,7 @@
{
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
if (i >= subsequentCacheSize)
{
@@ -1085,7 +1083,7 @@
{
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
assertEquals(i, message2.getProperty(propKey));
@@ -1130,7 +1128,7 @@
{
ClientMessage message = createMessage(session, i);
SimpleString dupID = new SimpleString("abcdefg" + i);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
if (i >= subsequentCacheSize)
{
@@ -1182,14 +1180,14 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
ClientMessage message2 = consumer.receive(1000);
assertEquals(1, message2.getProperty(propKey));
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
message2 = consumer.receive(1000);
assertEquals(2, message2.getProperty(propKey));
@@ -1217,13 +1215,13 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message2 = consumer.receive(200);
assertEquals(1, message2.getProperty(propKey));
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
message2 = consumer.receive(200);
assertEquals(2, message2.getProperty(propKey));
@@ -1265,7 +1263,7 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
ClientMessage message2 = consumer.receive(1000);
@@ -1273,7 +1271,7 @@
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.commit();
message2 = consumer.receive(1000);
@@ -1302,14 +1300,14 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
message2 = consumer.receive(200);
assertEquals(1, message2.getProperty(propKey));
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.commit();
message2 = consumer.receive(200);
@@ -1350,7 +1348,7 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
ClientMessage message2 = consumer.receive(1000);
@@ -1360,7 +1358,7 @@
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.commit();
message2 = consumer.receive(1000);
@@ -1391,14 +1389,14 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
message2 = consumer.receive(200);
assertNull(message2);
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.commit();
message2 = consumer.receive(200);
@@ -1445,12 +1443,12 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -1484,11 +1482,11 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
@@ -1544,12 +1542,12 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -1581,11 +1579,11 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
@@ -1641,12 +1639,12 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -1680,11 +1678,11 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
@@ -1740,12 +1738,12 @@
ClientMessage message = createMessage(session, 1);
SimpleString dupID = new SimpleString("abcdefg");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
SimpleString dupID2 = new SimpleString("hijklmnopqr");
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid, XAResource.TMSUCCESS);
@@ -1778,11 +1776,11 @@
consumer = session.createConsumer(queueName);
message = createMessage(session, 1);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
message = createMessage(session, 2);
- message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.end(xid2, XAResource.TMSUCCESS);
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -35,6 +35,24 @@
public class FakeBinding implements Binding
{
+ public int getID()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public void setID(int id)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void willRoute(ServerMessage message)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
public boolean filterMatches(ServerMessage message) throws Exception
{
// TODO Auto-generated method stub
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2009-01-29 18:24:44 UTC (rev 5760)
@@ -250,53 +250,53 @@
verify(mbeanServer);
}
- public void testRegisterQueue() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString name = randomSimpleString();
- ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
- ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
+// public void testRegisterQueue() throws Exception
+// {
+// SimpleString address = randomSimpleString();
+// SimpleString name = randomSimpleString();
+// ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
+// ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
+//
+// MBeanServer mbeanServer = createMock(MBeanServer.class);
+// Queue queue = createMock(Queue.class);
+// expect(queue.getName()).andStubReturn(name);
+// expect(queue.isDurable()).andReturn(true);
+// StorageManager storageManager = createMock(StorageManager.class);
+// expect(mbeanServer.isRegistered(objectName)).andReturn(false);
+// expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
+//
+// replay(mbeanServer, queue, storageManager);
+//
+// ManagementService service = new ManagementServiceImpl(mbeanServer, true);
+// service.registerQueue(queue, address, storageManager);
+//
+// verify(mbeanServer, queue, storageManager);
+// }
+//
+// public void testRegisterAlreadyRegisteredQueue() throws Exception
+// {
+// SimpleString address = randomSimpleString();
+// SimpleString name = randomSimpleString();
+// ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
+// ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
+//
+// MBeanServer mbeanServer = createMock(MBeanServer.class);
+// Queue queue = createMock(Queue.class);
+// expect(queue.getName()).andStubReturn(name);
+// expect(queue.isDurable()).andReturn(true);
+// StorageManager storageManager = createMock(StorageManager.class);
+// expect(mbeanServer.isRegistered(objectName)).andReturn(true);
+// mbeanServer.unregisterMBean(objectName);
+// expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
+//
+// replay(mbeanServer, queue, storageManager);
+//
+// ManagementService service = new ManagementServiceImpl(mbeanServer, true);
+// service.registerQueue(queue, address, storageManager);
+//
+// verify(mbeanServer, queue, storageManager);
+// }
- MBeanServer mbeanServer = createMock(MBeanServer.class);
- Queue queue = createMock(Queue.class);
- expect(queue.getName()).andStubReturn(name);
- expect(queue.isDurable()).andReturn(true);
- StorageManager storageManager = createMock(StorageManager.class);
- expect(mbeanServer.isRegistered(objectName)).andReturn(false);
- expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
-
- replay(mbeanServer, queue, storageManager);
-
- ManagementService service = new ManagementServiceImpl(mbeanServer, true);
- service.registerQueue(queue, address, storageManager);
-
- verify(mbeanServer, queue, storageManager);
- }
-
- public void testRegisterAlreadyRegisteredQueue() throws Exception
- {
- SimpleString address = randomSimpleString();
- SimpleString name = randomSimpleString();
- ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
- ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
-
- MBeanServer mbeanServer = createMock(MBeanServer.class);
- Queue queue = createMock(Queue.class);
- expect(queue.getName()).andStubReturn(name);
- expect(queue.isDurable()).andReturn(true);
- StorageManager storageManager = createMock(StorageManager.class);
- expect(mbeanServer.isRegistered(objectName)).andReturn(true);
- mbeanServer.unregisterMBean(objectName);
- expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
-
- replay(mbeanServer, queue, storageManager);
-
- ManagementService service = new ManagementServiceImpl(mbeanServer, true);
- service.registerQueue(queue, address, storageManager);
-
- verify(mbeanServer, queue, storageManager);
- }
-
public void testUnregisterQueue() throws Exception
{
SimpleString address = randomSimpleString();
More information about the jboss-cvs-commits
mailing list