[jboss-cvs] JBoss Messaging SVN: r6789 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 14 07:07:25 EDT 2009
Author: timfox
Date: 2009-05-14 07:07:24 -0400 (Thu, 14 May 2009)
New Revision: 6789
Modified:
trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
Log:
a few tweaks, and allow strings in duplicate detection
Modified: trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java
===================================================================
--- trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -68,7 +68,6 @@
// The server port etc.
Map<String, Object> connectionParams = new HashMap<String, Object>();
-
connectionParams.put(PORT_PROP_NAME, 5446);
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -58,7 +58,7 @@
public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = "org.jboss.messaging.core.client.impl.RoundRobinConnectionLoadBalancingPolicy";
- public static final long DEFAULT_PING_PERIOD = 1000000;
+ public static final long DEFAULT_PING_PERIOD = 5000;
// 5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
// or backup without fear of session having already been closed when connection times out.
@@ -107,7 +107,7 @@
public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
- public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 1;
+ public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 2;
// Attributes
// -----------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -365,7 +365,7 @@
ByteBuffer buff = ByteBuffer.wrap(bytes);
buff.putLong(msg.getMessageID());
-
+
msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -576,7 +576,9 @@
{
SimpleString address = message.getDestination();
- byte[] duplicateID = (byte[])message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+ byte[] duplicateIDBytes = null;
+
+ Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
@@ -584,8 +586,17 @@
{
cache = getDuplicateIDCache(message.getDestination());
- if (cache.contains(duplicateID))
+ if (duplicateID instanceof SimpleString)
{
+ duplicateIDBytes = ((SimpleString)duplicateID).getData();
+ }
+ else
+ {
+ duplicateIDBytes = (byte[])duplicateID;
+ }
+
+ if (cache.contains(duplicateIDBytes))
+ {
if (tx == null)
{
log.trace("Duplicate message detected - message will not be routed");
@@ -614,7 +625,7 @@
startedTx = true;
}
- cache.addToCache(duplicateID, tx);
+ cache.addToCache(duplicateIDBytes, tx);
}
if (tx == null)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -137,12 +137,9 @@
private final Binding binding;
- private MessagingServer server;
-
// Constructors ---------------------------------------------------------------------------------
- public ServerConsumerImpl(final MessagingServer server,
- final long id,
+ public ServerConsumerImpl(final long id,
final long replicatedSessionID,
final ServerSession session,
final QueueBinding binding,
@@ -158,8 +155,6 @@
final Executor executor,
final ManagementService managementService) throws Exception
{
- this.server = server;
-
this.id = id;
this.replicatedSessionID = replicatedSessionID;
@@ -190,11 +185,11 @@
this.managementService = managementService;
- binding.getQueue().addConsumer(this);
+ this.minLargeMessageSize = session.getMinLargeMessageSize();
- minLargeMessageSize = session.getMinLargeMessageSize();
-
this.updateDeliveries = updateDeliveries;
+
+ binding.getQueue().addConsumer(this);
}
// ServerConsumer implementation
@@ -271,7 +266,7 @@
{
return deliveringRefs.size();
}
-
+
public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
@@ -334,9 +329,13 @@
if (trace)
{
- log.trace("Received " + credits + " credits, previous value = " + previous + " currentValue = " + availableCredits.get());
+ log.trace("Received " + credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
-
+
if (previous <= 0 && previous + credits > 0)
{
promptDelivery();
@@ -372,7 +371,8 @@
messageID +
" backup = " +
messageQueue.isBackup() +
- " queue = " + messageQueue.getName() +
+ " queue = " +
+ messageQueue.getName() +
" closed = " +
closed);
}
@@ -451,7 +451,8 @@
if (ref == null)
{
throw new IllegalStateException("Cannot find Reference[" + messageID +
- "] after depaging on Queue " + messageQueue.getName());
+ "] after depaging on Queue " +
+ messageQueue.getName());
}
}
}
@@ -902,7 +903,7 @@
pendingLargeMessage.releaseResources();
int counter = pendingLargeMessage.decrementRefCount();
-
+
if (preAcknowledge && !browseOnly)
{
// PreAck will have an extra reference
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -11,6 +11,20 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.impl.ClientMessageImpl;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -20,7 +34,6 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.Notification;
-import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.BindingType;
@@ -85,17 +98,6 @@
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.TypedProperties;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
/*
* Session implementation
*
@@ -487,7 +489,7 @@
{
if (replicatingChannel == null)
{
- doHandleCommit(packet);
+ doHandleCommit(packet);
}
else
{
@@ -503,8 +505,7 @@
public void handleRollback(final RollbackMessage packet)
{
-
-
+
if (replicatingChannel == null)
{
doHandleRollback(packet);
@@ -625,7 +626,7 @@
public void handleXARollback(final SessionXARollbackMessage packet)
{
-
+
if (replicatingChannel == null)
{
doHandleXARollback(packet);
@@ -871,7 +872,7 @@
public void handleClose(final Packet packet)
{
-
+
if (replicatingChannel == null)
{
doHandleClose(packet);
@@ -912,7 +913,7 @@
public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
{
final ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
+
consumer.setStarted(false);
if (replicatingChannel == null)
@@ -922,7 +923,7 @@
else
{
final Queue queue;
-
+
if (consumer.getCountOfPendingDeliveries() > 0)
{
queue = consumer.getQueue();
@@ -932,7 +933,7 @@
{
queue = null;
}
-
+
// We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
// but we need to process the actual close() when the replication response returns, otherwise things
// can happen like acks can come in after close
@@ -1294,8 +1295,7 @@
theQueue = (Queue)binding.getBindable();
}
- ServerConsumer consumer = new ServerConsumerImpl(server,
- idGenerator.generateID(),
+ ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
oppositeChannelID,
this,
(QueueBinding)binding,
@@ -2529,12 +2529,12 @@
private HashSet<Queue> lockUsedQueues(Xid xid)
{
final HashSet<Queue> queues = new HashSet<Queue>();
-
+
for (ServerConsumer consumer : consumers.values())
{
queues.add(consumer.getQueue());
}
-
+
Transaction localTX;
if (xid == null)
{
@@ -2544,34 +2544,16 @@
{
localTX = resourceManager.getTransaction(xid);
}
-
+
if (localTX != null)
{
queues.addAll(localTX.getDistinctQueues());
}
-
+
for (Queue queue : queues)
{
queue.lockDelivery();
}
return queues;
}
-
-
- private void doSecurity(final ServerMessage msg) throws Exception
- {
- try
- {
- securityStore.check(msg.getDestination(), CheckType.SEND, this);
- }
- catch (MessagingException e)
- {
- if (!autoCommitSends)
- {
- tx.markAsRollbackOnly(e);
- }
- throw e;
- }
- }
-
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -64,7 +64,7 @@
// Constants -----------------------------------------------------
public static final SimpleString REPLYTO_HEADER_NAME = ClientMessageImpl.REPLYTO_HEADER_NAME;
-
+
public static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID");
public static final SimpleString JBM_MESSAGE_ID = new SimpleString("JMSMessageID");
@@ -78,7 +78,7 @@
private static final SimpleString JMS_ = new SimpleString("JMS_");
public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount";
-
+
public static final String JMS_JBM_INPUT_STREAM = "JMS_JBM_InputStream";
public static final String JMS_JBM_OUTPUT_STREAM = "JMS_JBM_OutputStream";
@@ -91,7 +91,7 @@
public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "JBM_BRIDGE_MSG_ID_LIST";
public static final byte TYPE = 0;
-
+
public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage)
{
Map<String, Object> jmsMessage = new HashMap<String, Object>();
@@ -122,7 +122,7 @@
jmsMessage.put(entry.getKey(), entry.getValue());
}
}
-
+
return jmsMessage;
}
@@ -154,17 +154,17 @@
switch (type)
{
- case JBossMessage.TYPE: //0
+ case JBossMessage.TYPE: // 0
{
msg = new JBossMessage(message, session);
break;
}
- case JBossBytesMessage.TYPE: //4
+ case JBossBytesMessage.TYPE: // 4
{
msg = new JBossBytesMessage(message, session);
break;
}
- case JBossMapMessage.TYPE: //5
+ case JBossMapMessage.TYPE: // 5
{
msg = new JBossMapMessage(message, session);
break;
@@ -174,12 +174,12 @@
msg = new JBossObjectMessage(message, session);
break;
}
- case JBossStreamMessage.TYPE: //6
+ case JBossStreamMessage.TYPE: // 6
{
msg = new JBossStreamMessage(message, session);
break;
}
- case JBossTextMessage.TYPE: //3
+ case JBossTextMessage.TYPE: // 3
{
msg = new JBossTextMessage(message, session);
break;
@@ -577,7 +577,7 @@
{
message.removeProperty(propName);
}
-
+
propertiesReadOnly = false;
}
@@ -804,12 +804,11 @@
{
return message.getBodyInputStream();
}
- else
- if (JMSXDELIVERYCOUNT.equals(name))
+ else if (JMSXDELIVERYCOUNT.equals(name))
{
return String.valueOf(message.getDeliveryCount());
}
-
+
Object val = message.getProperty(new SimpleString(name));
if (val instanceof SimpleString)
{
@@ -899,28 +898,28 @@
public void setObjectProperty(final String name, final Object value) throws JMSException
{
-
if (JMS_JBM_OUTPUT_STREAM.equals(name))
{
- this.setOutputStream((OutputStream)value);
+ setOutputStream((OutputStream)value);
+
return;
}
- else
- if (JMS_JBM_SAVE_STREAM.equals(name))
+ else if (JMS_JBM_SAVE_STREAM.equals(name))
{
- this.saveToOutputStream((OutputStream)value);
+ saveToOutputStream((OutputStream)value);
+
return;
}
-
+
checkProperty(name, value);
-
if (JMS_JBM_INPUT_STREAM.equals(name))
{
- this.setInputStream((InputStream)value);
+ setInputStream((InputStream)value);
+
return;
}
-
+
SimpleString key = new SimpleString(name);
if (value instanceof Boolean)
@@ -1001,8 +1000,7 @@
{
return JBossMessage.TYPE;
}
-
-
+
public void setInputStream(final InputStream input) throws JMSException
{
checkStream();
@@ -1013,7 +1011,7 @@
message.setBodyInputStream(input);
}
-
+
public void setOutputStream(final OutputStream output) throws JMSException
{
checkStream();
@@ -1021,7 +1019,7 @@
{
throw new IllegalStateException("OutputStream property is only valid on received messages");
}
-
+
try
{
message.setOutputStream(output);
@@ -1039,7 +1037,7 @@
{
throw new IllegalStateException("OutputStream property is only valid on received messages");
}
-
+
try
{
message.saveToOutputStream(output);
@@ -1062,7 +1060,6 @@
throw JMSExceptionHelper.convertFromMessagingException(e);
}
}
-
public String toString()
{
@@ -1093,7 +1090,7 @@
throw new MessageNotReadableException("Message is write-only");
}
}
-
+
protected MessagingBuffer getBody()
{
return message.getBody();
@@ -1108,7 +1105,7 @@
throw new IllegalStateException("LargeMessage streaming is only possible on ByteMessage or StreamMessage");
}
}
-
+
private void checkProperty(final String name, final Object value) throws JMSException
{
if (propertiesReadOnly)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java 2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java 2009-05-14 11:07:24 UTC (rev 6789)
@@ -126,7 +126,73 @@
sf.close();
}
+
+ public void testSimpleDuplicateDetectionWithString() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.start();
+
+ final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+ session.createQueue(queueName, queueName, null, false);
+
+ ClientProducer producer = session.createProducer(queueName);
+
+ ClientConsumer consumer = session.createConsumer(queueName);
+
+ ClientMessage message = createMessage(session, 0);
+ producer.send(message);
+ ClientMessage message2 = consumer.receive(1000);
+ assertEquals(0, message2.getProperty(propKey));
+
+ message = createMessage(session, 1);
+ SimpleString dupID = new SimpleString("abcdefg");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(1000);
+ assertEquals(1, message2.getProperty(propKey));
+
+ message = createMessage(session, 2);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(250);
+ assertNull(message2);
+
+ message = createMessage(session, 3);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(250);
+ assertNull(message2);
+
+ // Now try with a different id
+
+ message = createMessage(session, 4);
+ SimpleString dupID2 = new SimpleString("hijklmnop");
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ producer.send(message);
+ message2 = consumer.receive(1000);
+ assertEquals(4, message2.getProperty(propKey));
+
+ message = createMessage(session, 5);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+ producer.send(message);
+ message2 = consumer.receive(1000);
+ assertNull(message2);
+
+ message = createMessage(session, 6);
+ message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+ producer.send(message);
+ message2 = consumer.receive(250);
+ assertNull(message2);
+
+ session.close();
+
+ sf.close();
+ }
+
public void testCacheSize() throws Exception
{
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
More information about the jboss-cvs-commits
mailing list