[jboss-cvs] JBoss Messaging SVN: r6789 - in trunk: src/main/org/jboss/messaging/core/client/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 14 07:07:25 EDT 2009


Author: timfox
Date: 2009-05-14 07:07:24 -0400 (Thu, 14 May 2009)
New Revision: 6789

Modified:
   trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
   trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
Log:
a few tweaks, and allow strings in duplicate detection

Modified: trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java
===================================================================
--- trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/examples/jms/instantiate-connection-factory/src/org/jboss/jms/example/InstantiateConnectionFactoryExample.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -68,7 +68,6 @@
          // The server port etc.
 
          Map<String, Object> connectionParams = new HashMap<String, Object>();
-
          connectionParams.put(PORT_PROP_NAME, 5446);
 
          TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -58,7 +58,7 @@
 
    public static final String DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME = "org.jboss.messaging.core.client.impl.RoundRobinConnectionLoadBalancingPolicy";
 
-   public static final long DEFAULT_PING_PERIOD = 1000000;
+   public static final long DEFAULT_PING_PERIOD = 5000;
 
    // 5 minutes - normally this should be much higher than ping period, this allows clients to re-attach on live
    // or backup without fear of session having already been closed when connection times out.
@@ -107,7 +107,7 @@
 
    public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
 
-   public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 1;
+   public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 2;
 
    // Attributes
    // -----------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -365,7 +365,7 @@
             ByteBuffer buff = ByteBuffer.wrap(bytes);
 
             buff.putLong(msg.getMessageID());
-
+                        
             msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
          }
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -576,7 +576,9 @@
    {
       SimpleString address = message.getDestination();
 
-      byte[] duplicateID = (byte[])message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+      byte[] duplicateIDBytes = null;
+      
+      Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
 
       DuplicateIDCache cache = null;
 
@@ -584,8 +586,17 @@
       {
          cache = getDuplicateIDCache(message.getDestination());
 
-         if (cache.contains(duplicateID))
+         if (duplicateID instanceof SimpleString)
          {
+            duplicateIDBytes = ((SimpleString)duplicateID).getData();
+         }
+         else
+         {
+            duplicateIDBytes = (byte[])duplicateID;
+         }
+         
+         if (cache.contains(duplicateIDBytes))
+         {
             if (tx == null)
             {
                log.trace("Duplicate message detected - message will not be routed");
@@ -614,7 +625,7 @@
             startedTx = true;
          }
          
-         cache.addToCache(duplicateID, tx);
+         cache.addToCache(duplicateIDBytes, tx);
       }
 
       if (tx == null)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -137,12 +137,9 @@
 
    private final Binding binding;
 
-   private MessagingServer server;
-
    // Constructors ---------------------------------------------------------------------------------
 
-   public ServerConsumerImpl(final MessagingServer server,
-                             final long id,
+   public ServerConsumerImpl(final long id,
                              final long replicatedSessionID,
                              final ServerSession session,
                              final QueueBinding binding,
@@ -158,8 +155,6 @@
                              final Executor executor,
                              final ManagementService managementService) throws Exception
    {
-      this.server = server;
-
       this.id = id;
 
       this.replicatedSessionID = replicatedSessionID;
@@ -190,11 +185,11 @@
 
       this.managementService = managementService;
 
-      binding.getQueue().addConsumer(this);
+      this.minLargeMessageSize = session.getMinLargeMessageSize();
 
-      minLargeMessageSize = session.getMinLargeMessageSize();
-
       this.updateDeliveries = updateDeliveries;
+      
+      binding.getQueue().addConsumer(this);
    }
 
    // ServerConsumer implementation
@@ -271,7 +266,7 @@
    {
       return deliveringRefs.size();
    }
-   
+
    public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
    {
       boolean performACK = lastConsumedAsDelivered;
@@ -334,9 +329,13 @@
 
          if (trace)
          {
-            log.trace("Received " + credits + " credits, previous value = " + previous + " currentValue = " + availableCredits.get());
+            log.trace("Received " + credits +
+                      " credits, previous value = " +
+                      previous +
+                      " currentValue = " +
+                      availableCredits.get());
          }
-         
+
          if (previous <= 0 && previous + credits > 0)
          {
             promptDelivery();
@@ -372,7 +371,8 @@
                                             messageID +
                                             " backup = " +
                                             messageQueue.isBackup() +
-                                            " queue = " + messageQueue.getName() + 
+                                            " queue = " +
+                                            messageQueue.getName() +
                                             " closed = " +
                                             closed);
          }
@@ -451,7 +451,8 @@
             if (ref == null)
             {
                throw new IllegalStateException("Cannot find Reference[" + messageID +
-                                            "] after depaging on Queue " + messageQueue.getName());
+                                               "] after depaging on Queue " +
+                                               messageQueue.getName());
             }
          }
       }
@@ -902,7 +903,7 @@
          pendingLargeMessage.releaseResources();
 
          int counter = pendingLargeMessage.decrementRefCount();
-         
+
          if (preAcknowledge && !browseOnly)
          {
             // PreAck will have an extra reference

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -11,6 +11,20 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.impl.ClientMessageImpl;
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -20,7 +34,6 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.Notification;
-import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.BindingType;
@@ -85,17 +98,6 @@
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.TypedProperties;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
 /*
  * Session implementation 
  * 
@@ -487,7 +489,7 @@
    {
       if (replicatingChannel == null)
       {
-            doHandleCommit(packet);
+         doHandleCommit(packet);
       }
       else
       {
@@ -503,8 +505,7 @@
 
    public void handleRollback(final RollbackMessage packet)
    {
-      
-      
+
       if (replicatingChannel == null)
       {
          doHandleRollback(packet);
@@ -625,7 +626,7 @@
 
    public void handleXARollback(final SessionXARollbackMessage packet)
    {
-      
+
       if (replicatingChannel == null)
       {
          doHandleXARollback(packet);
@@ -871,7 +872,7 @@
 
    public void handleClose(final Packet packet)
    {
-      
+
       if (replicatingChannel == null)
       {
          doHandleClose(packet);
@@ -912,7 +913,7 @@
    public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
    {
       final ServerConsumer consumer = consumers.get(packet.getConsumerID());
-      
+
       consumer.setStarted(false);
 
       if (replicatingChannel == null)
@@ -922,7 +923,7 @@
       else
       {
          final Queue queue;
-         
+
          if (consumer.getCountOfPendingDeliveries() > 0)
          {
             queue = consumer.getQueue();
@@ -932,7 +933,7 @@
          {
             queue = null;
          }
-         
+
          // We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
          // but we need to process the actual close() when the replication response returns, otherwise things
          // can happen like acks can come in after close
@@ -1294,8 +1295,7 @@
             theQueue = (Queue)binding.getBindable();
          }
 
-         ServerConsumer consumer = new ServerConsumerImpl(server,
-                                                          idGenerator.generateID(),
+         ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
                                                           oppositeChannelID,
                                                           this,
                                                           (QueueBinding)binding,
@@ -2529,12 +2529,12 @@
    private HashSet<Queue> lockUsedQueues(Xid xid)
    {
       final HashSet<Queue> queues = new HashSet<Queue>();
-      
+
       for (ServerConsumer consumer : consumers.values())
       {
          queues.add(consumer.getQueue());
       }
-      
+
       Transaction localTX;
       if (xid == null)
       {
@@ -2544,34 +2544,16 @@
       {
          localTX = resourceManager.getTransaction(xid);
       }
-      
+
       if (localTX != null)
       {
          queues.addAll(localTX.getDistinctQueues());
       }
-      
+
       for (Queue queue : queues)
       {
          queue.lockDelivery();
       }
       return queues;
    }
-
-
-   private void doSecurity(final ServerMessage msg) throws Exception
-   {
-      try
-      {
-         securityStore.check(msg.getDestination(), CheckType.SEND, this);
-      }
-      catch (MessagingException e)
-      {
-         if (!autoCommitSends)
-         {
-            tx.markAsRollbackOnly(e);
-         }
-         throw e;
-      }
-   }
-
 }

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -64,7 +64,7 @@
    // Constants -----------------------------------------------------
 
    public static final SimpleString REPLYTO_HEADER_NAME = ClientMessageImpl.REPLYTO_HEADER_NAME;
-   
+
    public static final SimpleString CORRELATIONID_HEADER_NAME = new SimpleString("JMSCorrelationID");
 
    public static final SimpleString JBM_MESSAGE_ID = new SimpleString("JMSMessageID");
@@ -78,7 +78,7 @@
    private static final SimpleString JMS_ = new SimpleString("JMS_");
 
    public static final String JMSXDELIVERYCOUNT = "JMSXDeliveryCount";
-   
+
    public static final String JMS_JBM_INPUT_STREAM = "JMS_JBM_InputStream";
 
    public static final String JMS_JBM_OUTPUT_STREAM = "JMS_JBM_OutputStream";
@@ -91,7 +91,7 @@
    public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "JBM_BRIDGE_MSG_ID_LIST";
 
    public static final byte TYPE = 0;
-   
+
    public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage)
    {
       Map<String, Object> jmsMessage = new HashMap<String, Object>();
@@ -122,7 +122,7 @@
             jmsMessage.put(entry.getKey(), entry.getValue());
          }
       }
-      
+
       return jmsMessage;
    }
 
@@ -154,17 +154,17 @@
 
       switch (type)
       {
-         case JBossMessage.TYPE: //0
+         case JBossMessage.TYPE: // 0
          {
             msg = new JBossMessage(message, session);
             break;
          }
-         case JBossBytesMessage.TYPE: //4
+         case JBossBytesMessage.TYPE: // 4
          {
             msg = new JBossBytesMessage(message, session);
             break;
          }
-         case JBossMapMessage.TYPE: //5
+         case JBossMapMessage.TYPE: // 5
          {
             msg = new JBossMapMessage(message, session);
             break;
@@ -174,12 +174,12 @@
             msg = new JBossObjectMessage(message, session);
             break;
          }
-         case JBossStreamMessage.TYPE:  //6
+         case JBossStreamMessage.TYPE: // 6
          {
             msg = new JBossStreamMessage(message, session);
             break;
          }
-         case JBossTextMessage.TYPE: //3
+         case JBossTextMessage.TYPE: // 3
          {
             msg = new JBossTextMessage(message, session);
             break;
@@ -577,7 +577,7 @@
       {
          message.removeProperty(propName);
       }
-      
+
       propertiesReadOnly = false;
    }
 
@@ -804,12 +804,11 @@
       {
          return message.getBodyInputStream();
       }
-      else
-      if (JMSXDELIVERYCOUNT.equals(name))
+      else if (JMSXDELIVERYCOUNT.equals(name))
       {
          return String.valueOf(message.getDeliveryCount());
       }
-      
+
       Object val = message.getProperty(new SimpleString(name));
       if (val instanceof SimpleString)
       {
@@ -899,28 +898,28 @@
 
    public void setObjectProperty(final String name, final Object value) throws JMSException
    {
-      
       if (JMS_JBM_OUTPUT_STREAM.equals(name))
       {
-         this.setOutputStream((OutputStream)value);
+         setOutputStream((OutputStream)value);
+         
          return;
       }
-      else
-      if (JMS_JBM_SAVE_STREAM.equals(name))
+      else if (JMS_JBM_SAVE_STREAM.equals(name))
       {
-         this.saveToOutputStream((OutputStream)value);
+         saveToOutputStream((OutputStream)value);
+         
          return;
       }
-      
+
       checkProperty(name, value);
-      
 
       if (JMS_JBM_INPUT_STREAM.equals(name))
       {
-         this.setInputStream((InputStream)value);
+         setInputStream((InputStream)value);
+         
          return;
       }
-      
+
       SimpleString key = new SimpleString(name);
 
       if (value instanceof Boolean)
@@ -1001,8 +1000,7 @@
    {
       return JBossMessage.TYPE;
    }
-   
-   
+
    public void setInputStream(final InputStream input) throws JMSException
    {
       checkStream();
@@ -1013,7 +1011,7 @@
 
       message.setBodyInputStream(input);
    }
-   
+
    public void setOutputStream(final OutputStream output) throws JMSException
    {
       checkStream();
@@ -1021,7 +1019,7 @@
       {
          throw new IllegalStateException("OutputStream property is only valid on received messages");
       }
-      
+
       try
       {
          message.setOutputStream(output);
@@ -1039,7 +1037,7 @@
       {
          throw new IllegalStateException("OutputStream property is only valid on received messages");
       }
-      
+
       try
       {
          message.saveToOutputStream(output);
@@ -1062,7 +1060,6 @@
          throw JMSExceptionHelper.convertFromMessagingException(e);
       }
    }
-   
 
    public String toString()
    {
@@ -1093,7 +1090,7 @@
          throw new MessageNotReadableException("Message is write-only");
       }
    }
-   
+
    protected MessagingBuffer getBody()
    {
       return message.getBody();
@@ -1108,7 +1105,7 @@
          throw new IllegalStateException("LargeMessage streaming is only possible on ByteMessage or StreamMessage");
       }
    }
-   
+
    private void checkProperty(final String name, final Object value) throws JMSException
    {
       if (propertiesReadOnly)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java	2009-05-14 11:01:19 UTC (rev 6788)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java	2009-05-14 11:07:24 UTC (rev 6789)
@@ -126,7 +126,73 @@
 
       sf.close();
    }
+   
+   public void testSimpleDuplicateDetectionWithString() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
 
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.start();
+
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+
+      session.createQueue(queueName, queueName, null, false);
+
+      ClientProducer producer = session.createProducer(queueName);
+
+      ClientConsumer consumer = session.createConsumer(queueName);
+
+      ClientMessage message = createMessage(session, 0);
+      producer.send(message);
+      ClientMessage message2 = consumer.receive(1000);
+      assertEquals(0, message2.getProperty(propKey));
+
+      message = createMessage(session, 1);
+      SimpleString dupID = new SimpleString("abcdefg");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);
+      message2 = consumer.receive(1000);
+      assertEquals(1, message2.getProperty(propKey));
+
+      message = createMessage(session, 2);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);
+      message2 = consumer.receive(250);
+      assertNull(message2);
+
+      message = createMessage(session, 3);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);
+      message2 = consumer.receive(250);
+      assertNull(message2);
+
+      // Now try with a different id
+
+      message = createMessage(session, 4);
+      SimpleString dupID2 = new SimpleString("hijklmnop");
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      producer.send(message);
+      message2 = consumer.receive(1000);
+      assertEquals(4, message2.getProperty(propKey));
+
+      message = createMessage(session, 5);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      producer.send(message);
+      message2 = consumer.receive(1000);
+      assertNull(message2);
+
+      message = createMessage(session, 6);
+      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      producer.send(message);
+      message2 = consumer.receive(250);
+      assertNull(message2);
+
+      session.close();
+
+      sf.close();
+   }
+
    public void testCacheSize() throws Exception
    {
       ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));




More information about the jboss-cvs-commits mailing list