[jboss-cvs] JBoss Messaging SVN: r5885 - in trunk: src/main/org/jboss/messaging/core/config and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Feb 17 20:39:27 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-02-17 20:39:27 -0500 (Tue, 17 Feb 2009)
New Revision: 5885

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/schemas/jbm-configuration.xsd
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
JBMESSAGING-1339 and JBMESSAGING-1294 - Delivery Counter fixes

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/config/jbm-configuration.xml	2009-02-18 01:39:27 UTC (rev 5885)
@@ -48,6 +48,8 @@
 
       <backup>false</backup>
       
+      <strict-update-delivery>false</strict-update-delivery>
+
       <!--
       <backup-connector-ref connector-name="netty-backup"/>
       -->
@@ -180,4 +182,4 @@
 
    </configuration>
    
-</deployment>
\ No newline at end of file
+</deployment>

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -51,14 +51,18 @@
 
    void setClustered(boolean clustered);
 
+   boolean isStrictUpdateDelivery();
+
+   void setStrictUpdateDelivery(boolean strictUpdateDelivery);
+
    boolean isBackup();
 
    void setBackup(boolean backup);
-   
+
    long getQueueActivationTimeout();
-   
+
    void setQueueActivationTimeout(long timeout);
-   
+
    int getScheduledThreadPoolMaxSize();
 
    void setScheduledThreadPoolMaxSize(int maxSize);
@@ -84,9 +88,9 @@
    void setInterceptorClassNames(List<String> interceptors);
 
    long getConnectionScanPeriod();
-   
+
    void setConnectionScanPeriod(long scanPeriod);
-   
+
    long getConnectionTTLOverride();
 
    void setConnectionTTLOverride(long ttl);
@@ -94,58 +98,57 @@
    Set<TransportConfiguration> getAcceptorConfigurations();
 
    void setAcceptorConfigurations(Set<TransportConfiguration> infos);
-   
+
    Map<String, TransportConfiguration> getConnectorConfigurations();
 
-   void setConnectorConfigurations(Map<String, TransportConfiguration> infos);   
+   void setConnectorConfigurations(Map<String, TransportConfiguration> infos);
 
    String getBackupConnectorName();
 
    void setBackupConnectorName(String name);
-   
+
    List<BroadcastGroupConfiguration> getBroadcastGroupConfigurations();
-   
+
    void setBroadcastGroupConfigurations(List<BroadcastGroupConfiguration> configs);
-   
+
    Map<String, DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations();
-   
+
    void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
-   
+
    List<BridgeConfiguration> getBridgeConfigurations();
 
    void setBridgeConfigurations(final List<BridgeConfiguration> configs);
-   
+
    List<DivertConfiguration> getDivertConfigurations();
 
    void setDivertConfigurations(final List<DivertConfiguration> configs);
-   
+
    List<ClusterConnectionConfiguration> getClusterConfigurations();
 
    void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs);
-   
+
    List<QueueConfiguration> getQueueConfigurations();
 
    void setQueueConfigurations(final List<QueueConfiguration> configs);
-   
+
    SimpleString getManagementAddress();
-   
+
    void setManagementAddress(SimpleString address);
 
    SimpleString getManagementNotificationAddress();
-   
+
    String getManagementClusterPassword();
 
    long getManagementRequestTimeout();
 
    int getIDCacheSize();
-   
+
    void setIDCacheSize(int idCacheSize);
-   
+
    boolean isPersistIDCache();
-   
+
    void setPersistIDCache(boolean persist);
 
-   
    // Journal related attributes ------------------------------------------------------------
 
    String getBindingsDirectory();
@@ -192,13 +195,12 @@
 
    void setCreateJournalDir(boolean create);
 
-   
    // Paging Properties --------------------------------------------------------------------
-   
+
    int getPagingMaxThreads();
-   
+
    void setPagingMaxThread(int pagingMaxThreads);
-   
+
    String getPagingDirectory();
 
    void setPagingDirectory(String dir);
@@ -206,15 +208,15 @@
    long getPagingMaxGlobalSizeBytes();
 
    void setPagingMaxGlobalSizeBytes(long maxGlobalSize);
-   
+
    long getPagingDefaultSize();
-   
+
    void setPagingDefaultSize(long pageSize);
-   
+
    // Large Messages Properties ------------------------------------------------------------
-   
+
    String getLargeMessagesDirectory();
-   
+
    void setLargeMessagesDirectory(String directory);
 
    boolean isWildcardRoutingEnabled();

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -43,8 +43,10 @@
 
    public static final boolean DEFAULT_CLUSTERED = false;
 
+   public static final boolean DEFAULT_STRICT_UPDATE_DELIVERY = false;
+
    public static final boolean DEFAULT_BACKUP = false;
-   
+
    public static final long DEFAULT_QUEUE_ACTIVATION_TIMEOUT = 30000;
 
    public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 30;
@@ -58,7 +60,7 @@
    public static final boolean DEFAULT_JMX_MANAGEMENT_ENABLED = true;
 
    public static final long DEFAULT_CONNECTION_SCAN_PERIOD = 1000;
-   
+
    public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
 
    public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
@@ -68,13 +70,13 @@
    public static final String DEFAULT_JOURNAL_DIR = "data/journal";
 
    public static final String DEFAULT_PAGING_DIR = "data/paging";
-   
+
    public static final int DEFAULT_PAGE_MAX_THREADS = 10;
-   
+
    public static final long DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
-   
+
    public static final long DEFAULT_PAGE_MAX_GLOBAL_SIZE = -1;
-   
+
    public static final String DEFAULT_LARGE_MESSAGES_DIR = "data/largemessages";
 
    public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
@@ -100,19 +102,19 @@
    public static final long DEFAULT_TRANSACTION_TIMEOUT = 60000;
 
    public static final long DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD = 1000;
-   
+
    public static final SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("jbm.admin.management");
-   
+
    public static final SimpleString DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS = new SimpleString("jbm.admin.notification");
 
    public static final String DEFAULT_MANAGEMENT_CLUSTER_PASSWORD = "CHANGE ME!!";
 
-   public static final  long DEFAULT_MANAGEMENT_REQUEST_TIMEOUT = 500;
-   
+   public static final long DEFAULT_MANAGEMENT_REQUEST_TIMEOUT = 500;
+
    public static final long DEFAULT_BROADCAST_PERIOD = 5000;
-   
+
    public static final long DEFAULT_BROADCAST_REFRESH_TIMEOUT = 10000;
-   
+
    public static final int DEFAULT_MAX_FORWARD_BATCH_SIZE = 1;
 
    public static final long DEFAULT_MAX_FORWARD_BATCH_TIME = -1;
@@ -120,19 +122,19 @@
    public static final long DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD = 30000;
 
    public static final int DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY = 3;
-   
+
    public static final int DEFAULT_ID_CACHE_SIZE = 2000;
-   
+
    public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
-   
+
    public static final boolean DEFAULT_CLUSTER_DUPLICATE_DETECTION = true;
-   
+
    public static final boolean DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS = false;
-   
+
    public static final int DEFAULT_CLUSTER_MAX_HOPS = 1;
-   
+
    public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
-   
+
    public static final boolean DEFAULT_BRIDGE_DUPLICATE_DETECTION = true;
 
    // Attributes -----------------------------------------------------------------------------
@@ -141,6 +143,8 @@
 
    protected boolean backup = DEFAULT_BACKUP;
 
+   protected boolean strictUpdateDelivery = DEFAULT_STRICT_UPDATE_DELIVERY;
+
    protected long queueActivationTimeout = DEFAULT_QUEUE_ACTIVATION_TIMEOUT;
 
    protected int scheduledThreadPoolMaxSize = DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
@@ -154,53 +158,51 @@
    protected boolean jmxManagementEnabled = DEFAULT_JMX_MANAGEMENT_ENABLED;
 
    protected long connectionScanPeriod = DEFAULT_CONNECTION_SCAN_PERIOD;
-   
+
    protected long connectionTTLOverride = DEFAULT_CONNECTION_TTL_OVERRIDE;
 
    protected long messageExpiryScanPeriod = DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD;
 
    protected int messageExpiryThreadPriority = DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
-   
+
    protected int idCacheSize = DEFAULT_ID_CACHE_SIZE;
-   
+
    protected boolean persistIDCache = DEFAULT_PERSIST_ID_CACHE;
-   
+
    protected List<String> interceptorClassNames = new ArrayList<String>();
-   
+
    protected Map<String, TransportConfiguration> connectorConfigs = new HashMap<String, TransportConfiguration>();
 
    protected Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
 
    protected String backupConnectorName;
-   
+
    protected List<BridgeConfiguration> bridgeConfigurations = new ArrayList<BridgeConfiguration>();
-   
+
    protected List<DivertConfiguration> divertConfigurations = new ArrayList<DivertConfiguration>();
-   
+
    protected List<ClusterConnectionConfiguration> clusterConfigurations = new ArrayList<ClusterConnectionConfiguration>();
-   
+
    protected List<QueueConfiguration> queueConfigurations = new ArrayList<QueueConfiguration>();
-   
+
    protected List<BroadcastGroupConfiguration> broadcastGroupConfigurations = new ArrayList<BroadcastGroupConfiguration>();
-   
+
    protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
-   
-   
+
    // Paging related attributes ------------------------------------------------------------
 
    protected long pagingMaxGlobalSize = DEFAULT_PAGE_MAX_GLOBAL_SIZE;
-   
+
    protected long pagingDefaultSize = DEFAULT_PAGE_SIZE;
 
    protected String pagingDirectory = DEFAULT_PAGING_DIR;
-   
+
    protected int pagingMaxThreads = DEFAULT_PAGE_MAX_THREADS;
-   
 
    // File related attributes -----------------------------------------------------------
 
    protected String largeMessagesDirectory = DEFAULT_LARGE_MESSAGES_DIR;
-   
+
    protected String bindingsDirectory = DEFAULT_BINDINGS_DIRECTORY;
 
    protected boolean createBindingsDir = DEFAULT_CREATE_BINDINGS_DIR;
@@ -230,15 +232,15 @@
    protected long transactionTimeout = DEFAULT_TRANSACTION_TIMEOUT;
 
    protected long transactionTimeoutScanPeriod = DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
-   
+
    protected SimpleString managementAddress = DEFAULT_MANAGEMENT_ADDRESS;
 
    protected SimpleString managementNotificationAddress = DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
-   
+
    protected String managementClusterPassword = DEFAULT_MANAGEMENT_CLUSTER_PASSWORD;
 
    protected long managementRequestTimeout = DEFAULT_MANAGEMENT_REQUEST_TIMEOUT;
-   
+
    public boolean isClustered()
    {
       return clustered;
@@ -254,6 +256,22 @@
       return backup;
    }
 
+   /**
+    * @return the strictUpdateDelivery
+    */
+   public boolean isStrictUpdateDelivery()
+   {
+      return strictUpdateDelivery;
+   }
+
+   /**
+    * @param strictJMS the strictJMS to set
+    */
+   public void setStrictUpdateDelivery(final boolean strictUpdateDelivery)
+   {
+      this.strictUpdateDelivery = strictUpdateDelivery;
+   }
+
    public void setBackup(final boolean backup)
    {
       this.backup = backup;
@@ -264,9 +282,9 @@
       return queueActivationTimeout;
    }
 
-   public void setQueueActivationTimeout(long timeout)
+   public void setQueueActivationTimeout(final long timeout)
    {
-      this.queueActivationTimeout = timeout;
+      queueActivationTimeout = timeout;
    }
 
    public int getScheduledThreadPoolMaxSize()
@@ -308,7 +326,7 @@
    {
       connectionScanPeriod = scanPeriod;
    }
-   
+
    public long getConnectionTTLOverride()
    {
       return connectionTTLOverride;
@@ -316,7 +334,7 @@
 
    public void setConnectionTTLOverride(final long ttl)
    {
-      this.connectionTTLOverride = ttl;
+      connectionTTLOverride = ttl;
    }
 
    public List<String> getInterceptorClassNames()
@@ -338,7 +356,7 @@
    {
       acceptorConfigs = infos;
    }
-   
+
    public Map<String, TransportConfiguration> getConnectorConfigurations()
    {
       return connectorConfigs;
@@ -346,7 +364,7 @@
 
    public void setConnectorConfigurations(final Map<String, TransportConfiguration> infos)
    {
-      this.connectorConfigs = infos;
+      connectorConfigs = infos;
    }
 
    public String getBackupConnectorName()
@@ -358,55 +376,55 @@
    {
       this.backupConnectorName = backupConnectorName;
    }
-   
+
    public List<BridgeConfiguration> getBridgeConfigurations()
    {
       return bridgeConfigurations;
    }
-   
+
    public void setBridgeConfigurations(final List<BridgeConfiguration> configs)
    {
-      this.bridgeConfigurations = configs;
+      bridgeConfigurations = configs;
    }
 
    public List<BroadcastGroupConfiguration> getBroadcastGroupConfigurations()
    {
-      return this.broadcastGroupConfigurations;
+      return broadcastGroupConfigurations;
    }
-   
+
    public void setBroadcastGroupConfigurations(final List<BroadcastGroupConfiguration> configs)
    {
-      this.broadcastGroupConfigurations = configs;
+      broadcastGroupConfigurations = configs;
    }
 
    public List<ClusterConnectionConfiguration> getClusterConfigurations()
    {
-      return this.clusterConfigurations;
+      return clusterConfigurations;
    }
-   
+
    public void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs)
    {
-      this.clusterConfigurations = configs;
+      clusterConfigurations = configs;
    }
 
    public List<DivertConfiguration> getDivertConfigurations()
    {
-      return this.divertConfigurations;
+      return divertConfigurations;
    }
-   
+
    public void setDivertConfigurations(final List<DivertConfiguration> configs)
    {
-      this.divertConfigurations = configs;
+      divertConfigurations = configs;
    }
 
    public List<QueueConfiguration> getQueueConfigurations()
    {
-      return this.queueConfigurations;
+      return queueConfigurations;
    }
 
    public void setQueueConfigurations(final List<QueueConfiguration> configs)
    {
-      this.queueConfigurations = configs;
+      queueConfigurations = configs;
    }
 
    public Map<String, DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations()
@@ -418,25 +436,25 @@
    {
       this.discoveryGroupConfigurations = discoveryGroupConfigurations;
    }
-   
+
    public int getIDCacheSize()
    {
       return idCacheSize;
    }
-   
+
    public void setIDCacheSize(final int idCacheSize)
    {
       this.idCacheSize = idCacheSize;
    }
-   
+
    public boolean isPersistIDCache()
    {
       return persistIDCache;
    }
-   
+
    public void setPersistIDCache(final boolean persist)
    {
-      this.persistIDCache = persist;
+      persistIDCache = persist;
    }
 
    public String getBindingsDirectory()
@@ -463,17 +481,17 @@
    {
       return journalType;
    }
-   
+
    public int getPagingMaxThreads()
    {
       return pagingMaxThreads;
    }
-   
+
    public void setPagingMaxThread(final int pagingMaxThreads)
    {
       this.pagingMaxThreads = pagingMaxThreads;
    }
-   
+
    public void setPagingDirectory(final String dir)
    {
       pagingDirectory = dir;
@@ -564,9 +582,9 @@
       return wildcardRoutingEnabled;
    }
 
-   public void setWildcardRoutingEnabled(boolean enabled)
+   public void setWildcardRoutingEnabled(final boolean enabled)
    {
-      this.wildcardRoutingEnabled = enabled;
+      wildcardRoutingEnabled = enabled;
    }
 
    public long getTransactionTimeout()
@@ -574,7 +592,7 @@
       return transactionTimeout;
    }
 
-   public void setTransactionTimeout(long timeout)
+   public void setTransactionTimeout(final long timeout)
    {
       transactionTimeout = timeout;
    }
@@ -584,7 +602,7 @@
       return transactionTimeoutScanPeriod;
    }
 
-   public void setTransactionTimeoutScanPeriod(long period)
+   public void setTransactionTimeoutScanPeriod(final long period)
    {
       transactionTimeoutScanPeriod = period;
    }
@@ -594,7 +612,7 @@
       return messageExpiryScanPeriod;
    }
 
-   public void setMessageExpiryScanPeriod(long messageExpiryScanPeriod)
+   public void setMessageExpiryScanPeriod(final long messageExpiryScanPeriod)
    {
       this.messageExpiryScanPeriod = messageExpiryScanPeriod;
    }
@@ -604,7 +622,7 @@
       return messageExpiryThreadPriority;
    }
 
-   public void setMessageExpiryThreadPriority(int messageExpiryThreadPriority)
+   public void setMessageExpiryThreadPriority(final int messageExpiryThreadPriority)
    {
       this.messageExpiryThreadPriority = messageExpiryThreadPriority;
    }
@@ -648,7 +666,7 @@
    {
       pagingMaxGlobalSize = maxGlobalSize;
    }
-   
+
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.config.Configuration#getPagingDefaultSize()
     */
@@ -660,64 +678,62 @@
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.config.Configuration#setPagingDefaultSize(long)
     */
-   public void setPagingDefaultSize(long pageSize)
+   public void setPagingDefaultSize(final long pageSize)
    {
-      this.pagingDefaultSize = pageSize;
+      pagingDefaultSize = pageSize;
    }
-   
-   
+
    public String getLargeMessagesDirectory()
    {
       return largeMessagesDirectory;
    }
-   
+
    public void setLargeMessagesDirectory(final String directory)
    {
-      this.largeMessagesDirectory = directory;
+      largeMessagesDirectory = directory;
    }
-   
 
    public boolean isMessageCounterEnabled()
    {
       return messageCounterEnabled;
    }
-   
+
    public SimpleString getManagementAddress()
    {
       return managementAddress;
    }
-   
-   public void setManagementAddress(SimpleString address)
+
+   public void setManagementAddress(final SimpleString address)
    {
-      this.managementAddress = address;
+      managementAddress = address;
    }
-   
+
    public SimpleString getManagementNotificationAddress()
    {
-      return managementNotificationAddress ;
+      return managementNotificationAddress;
    }
-   
-   public void setManagementNotificationAddress(SimpleString address)
+
+   public void setManagementNotificationAddress(final SimpleString address)
    {
-      this.managementNotificationAddress = address;
+      managementNotificationAddress = address;
    }
-      
+
    public String getManagementClusterPassword()
    {
       return managementClusterPassword;
    }
-   
-   public void setManagementClusterPassword(String clusterPassword)
+
+   public void setManagementClusterPassword(final String clusterPassword)
    {
-      this.managementClusterPassword = clusterPassword;
+      managementClusterPassword = clusterPassword;
    }
-   
+
    public long getManagementRequestTimeout()
    {
       return managementRequestTimeout;
    }
-   
-   public void setManagementRequestTimeout(long managementRequestTimeout)
+
+   public void setManagementRequestTimeout(final long managementRequestTimeout)
    {
       this.managementRequestTimeout = managementRequestTimeout;
    }

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -89,6 +89,8 @@
 
       backup = getBoolean(e, "backup", backup);
 
+      strictUpdateDelivery = getBoolean(e, "strict-update-delivery", strictUpdateDelivery);
+
       queueActivationTimeout = getLong(e, "queue-activation-timeout", queueActivationTimeout);
 
       // NOTE! All the defaults come from the super class
@@ -121,12 +123,14 @@
 
       managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
 
-      managementNotificationAddress = new SimpleString(getString(e, "management-notification-address", managementNotificationAddress.toString()));
+      managementNotificationAddress = new SimpleString(getString(e,
+                                                                 "management-notification-address",
+                                                                 managementNotificationAddress.toString()));
 
       managementClusterPassword = getString(e, "management-cluster-password", managementClusterPassword.toString());
 
       managementRequestTimeout = getLong(e, "management-request-timeout", managementRequestTimeout);
-      
+
       NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
 
       ArrayList<String> interceptorList = new ArrayList<String>();
@@ -263,7 +267,7 @@
 
       String s = getString(e, "journal-type", journalType.toString());
 
-      if (s == null || (!s.equals(JournalType.NIO.toString()) && !s.equals(JournalType.ASYNCIO.toString())))
+      if (s == null || !s.equals(JournalType.NIO.toString()) && !s.equals(JournalType.ASYNCIO.toString()))
       {
          throw new IllegalArgumentException("Invalid journal type " + s);
       }
@@ -299,14 +303,14 @@
       return configurationUrl;
    }
 
-   public void setConfigurationUrl(String configurationUrl)
+   public void setConfigurationUrl(final String configurationUrl)
    {
       this.configurationUrl = configurationUrl;
    }
 
    // Private -------------------------------------------------------------------------
 
-   private Boolean getBoolean(Element e, String name, Boolean def)
+   private Boolean getBoolean(final Element e, final String name, final Boolean def)
    {
       NodeList nl = e.getElementsByTagName(name);
       if (nl.getLength() > 0)
@@ -316,7 +320,7 @@
       return def;
    }
 
-   private Integer getInteger(Element e, String name, Integer def)
+   private Integer getInteger(final Element e, final String name, final Integer def)
    {
       NodeList nl = e.getElementsByTagName(name);
       if (nl.getLength() > 0)
@@ -326,7 +330,7 @@
       return def;
    }
 
-   private Long getLong(Element e, String name, Long def)
+   private Long getLong(final Element e, final String name, final Long def)
    {
       NodeList nl = e.getElementsByTagName(name);
       if (nl.getLength() > 0)
@@ -336,7 +340,7 @@
       return def;
    }
 
-   private String getString(Element e, String name, String def)
+   private String getString(final Element e, final String name, final String def)
    {
       NodeList nl = e.getElementsByTagName(name);
       if (nl.getLength() > 0)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -951,6 +951,7 @@
                                                               autoCommitSends,
                                                               autoCommitAcks,
                                                               preAcknowledge,
+                                                              configuration.isStrictUpdateDelivery(),
                                                               xa,
                                                               connection,
                                                               storageManager,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -120,6 +120,8 @@
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
    private final boolean browseOnly;
+   
+   private final boolean updateDeliveries;
 
    private final StorageManager storageManager;
 
@@ -149,6 +151,7 @@
                              final PagingManager pagingManager,
                              final Channel channel,
                              final boolean preAcknowledge,
+                             final boolean updateDeliveries,
                              final Executor executor,
                              final ManagementService managementService) throws Exception
    {
@@ -181,6 +184,8 @@
       binding.getQueue().addConsumer(this);
 
       minLargeMessageSize = session.getMinLargeMessageSize();
+      
+      this.updateDeliveries = updateDeliveries;
    }
 
    // ServerConsumer implementation
@@ -401,10 +406,6 @@
          else
          {
             ref.getQueue().acknowledge(tx, ref);
-
-            // Del count is not actually updated in storage unless it's
-            // cancelled
-            ref.incrementDeliveryCount();
          }
       }
       while (ref.getMessage().getMessageID() != messageID);
@@ -651,6 +652,18 @@
             deliverStandardMessage(ref, message);
          }
 
+         ref.incrementDeliveryCount();
+         
+         // If updateDeliveries = false (set by strict-update),
+         // the updateDeliveryCount would still be updated after cancel
+         if (updateDeliveries)
+         {
+            if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+            {
+               storageManager.updateDeliveryCount(ref);
+            }
+         }
+         
          return HandleStatus.HANDLED;
       }
       finally

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -138,6 +138,8 @@
    private final boolean autoCommitAcks;
 
    private final boolean preAcknowledge;
+   
+   private final boolean updateDeliveries;
 
    private volatile RemotingConnection remotingConnection;
 
@@ -195,6 +197,7 @@
                             final boolean autoCommitSends,
                             final boolean autoCommitAcks,
                             final boolean preAcknowledge,
+                            final boolean updateDeliveries,
                             final boolean xa,
                             final RemotingConnection remotingConnection,
                             final StorageManager storageManager,
@@ -238,6 +241,8 @@
       {
          tx = new TransactionImpl(storageManager);
       }
+      
+      this.updateDeliveries = updateDeliveries;
 
       this.channel = channel;
 
@@ -1412,6 +1417,7 @@
                                                           postOffice.getPagingManager(),
                                                           channel,
                                                           preAcknowledge,
+                                                          updateDeliveries,
                                                           executor,
                                                           managementService);
 

Modified: trunk/src/schemas/jbm-configuration.xsd
===================================================================
--- trunk/src/schemas/jbm-configuration.xsd	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/schemas/jbm-configuration.xsd	2009-02-18 01:39:27 UTC (rev 5885)
@@ -96,6 +96,9 @@
 				<xsd:element name="backup" type="xsd:boolean"
 					maxOccurs="1" minOccurs="0">
 				</xsd:element>
+				<xsd:element name="strict-update-delivery" type="xsd:boolean"
+					maxOccurs="1" minOccurs="0">
+				</xsd:element>
 				<xsd:element name="backup-connector-ref"
 					type="backup-connectorType" maxOccurs="1" minOccurs="0">
 				</xsd:element>

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -3764,99 +3764,99 @@
    }
 
    // http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
-//   public void testExceptionMessageListener1() throws Exception
-//   {
-//   	Connection conn = null;
-//      
-//      try
-//      {	      
-//	      conn = cf.createConnection();
-//
-//	      conn.start();
-//	
-//	      Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//	
-//	      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//	
-//	      MessageConsumer cons = sess.createConsumer(queue1);
-//	
-//	      ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
-//	
-//	      cons.setMessageListener(listener);
-//	
-//	      MessageProducer prod = sessSend.createProducer(queue1);
-//	      TextMessage m1 = sess.createTextMessage("a");
-//	      TextMessage m2 = sess.createTextMessage("b");
-//	      TextMessage m3 = sess.createTextMessage("c");
-//	
-//	      prod.send(m1);
-//	      prod.send(m2);
-//	      prod.send(m3);
-//	
-//	      listener.waitForMessages();
-//	
-//	      assertFalse(listener.message, listener.failed);
-//	
-//	      conn.close();
-//	      
-//	      conn = null;
-//	   }
-//	   finally
-//	   {
-//	   	if (conn != null)
-//	   	{
-//	   		conn.close();
-//	   	}
-//	   }
-//   }
-//
-//   public void testExceptionMessageListener2() throws Exception
-//   {
-//   	Connection conn = null;
-//      
-//      try
-//      {	      
-//	      conn = cf.createConnection();
-//
-//	      conn.start();
-//	
-//	      Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//	
-//	      Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//	
-//	      MessageConsumer cons = sess.createConsumer(queue1);
-//	
-//	      ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
-//	
-//	      cons.setMessageListener(listener);
-//	
-//	      MessageProducer prod = sessSend.createProducer(queue1);
-//	      TextMessage m1 = sess.createTextMessage("a");
-//	      TextMessage m2 = sess.createTextMessage("b");
-//	      TextMessage m3 = sess.createTextMessage("c");
-//	
-//	      prod.send(m1);
-//	      prod.send(m2);
-//	      prod.send(m3);
-//	
-//	      listener.waitForMessages();
-//	
-//	      assertFalse(listener.message, listener.failed);
-//	  	
-//	
-//	      conn.close();
-//	      
-//	      conn = null;
-//	   }
-//	   finally
-//	   {
-//	   	if (conn != null)
-//	   	{
-//	   		conn.close();
-//	   	}
-//	   }
-//   }
+   public void testExceptionMessageListener1() throws Exception
+   {
+   	Connection conn = null;
+      
+      try
+      {	      
+	      conn = cf.createConnection();
 
+	      conn.start();
+	
+	      Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	
+	      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	
+	      MessageConsumer cons = sess.createConsumer(queue1);
+	
+	      ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
+	
+	      cons.setMessageListener(listener);
+	
+	      MessageProducer prod = sessSend.createProducer(queue1);
+	      TextMessage m1 = sess.createTextMessage("a");
+	      TextMessage m2 = sess.createTextMessage("b");
+	      TextMessage m3 = sess.createTextMessage("c");
+	
+	      prod.send(m1);
+	      prod.send(m2);
+	      prod.send(m3);
+	
+	      listener.waitForMessages();
+	
+	      assertFalse(listener.message, listener.failed);
+	
+	      conn.close();
+	      
+	      conn = null;
+	   }
+	   finally
+	   {
+	   	if (conn != null)
+	   	{
+	   		conn.close();
+	   	}
+	   }
+   }
+
+   public void testExceptionMessageListener2() throws Exception
+   {
+   	Connection conn = null;
+      
+      try
+      {	      
+	      conn = cf.createConnection();
+
+	      conn.start();
+	
+	      Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+	
+	      Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+	
+	      MessageConsumer cons = sess.createConsumer(queue1);
+	
+	      ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
+	
+	      cons.setMessageListener(listener);
+	
+	      MessageProducer prod = sessSend.createProducer(queue1);
+	      TextMessage m1 = sess.createTextMessage("a");
+	      TextMessage m2 = sess.createTextMessage("b");
+	      TextMessage m3 = sess.createTextMessage("c");
+	
+	      prod.send(m1);
+	      prod.send(m2);
+	      prod.send(m3);
+	
+	      listener.waitForMessages();
+	
+	      assertFalse(listener.message, listener.failed);
+	  	
+	
+	      conn.close();
+	      
+	      conn = null;
+	   }
+	   finally
+	   {
+	   	if (conn != null)
+	   	{
+	   		conn.close();
+	   	}
+	   }
+   }
+
    public void testExceptionMessageListener3() throws Exception
    {
       Connection conn = null;

Added: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java	2009-02-18 01:39:27 UTC (rev 5885)
@@ -0,0 +1,164 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.consumer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RedeliveryConsumerTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Feb 17, 2009 6:06:11 PM
+ *
+ *
+ */
+public class RedeliveryConsumerTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   MessagingService messagingService;
+
+   final SimpleString ADDRESS = new SimpleString("address");
+
+   ClientSessionFactory factory;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testRedeliveryMessageStrict() throws Exception
+   {
+      testDedeliveryMessageOnPersistent(true);
+   }
+
+   public void testRedeliveryMessageSimpleCancel() throws Exception
+   {
+      testDedeliveryMessageOnPersistent(false);
+   }
+
+   protected void testDedeliveryMessageOnPersistent(boolean strictUpdate) throws Exception
+   {
+      setUp(strictUpdate);
+      ClientSession session = factory.createSession(false, true, false);
+      ClientProducer prod = session.createProducer(ADDRESS);
+      prod.send(createTextMessage(session, "Hello"));
+      session.commit();
+      session.close();
+      
+      messagingService.stop();
+      messagingService.start();
+      
+      session = factory.createSession(false, true, false);
+      session.start();
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+      
+      ClientMessage msg = consumer.receive(1000);
+      assertEquals(1, msg.getDeliveryCount());
+      assertNotNull(msg);
+      session.stop();
+      
+      // if strictUpdate == true, this will simulating a crash, where the server is stopped without closing/rolling back the session
+      if (!strictUpdate)
+      {
+         // If non Strict, at least rollback/cancel should still update the delivery-counts
+         session.rollback();
+         session.close();
+      }
+      
+      messagingService.stop();
+      
+      messagingService.start();
+      
+      session = factory.createSession(false, true, false);
+      session.start();
+      consumer = session.createConsumer(ADDRESS);
+      msg = consumer.receive(1000);
+      assertNotNull(msg);
+      assertEquals(2, msg.getDeliveryCount());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   /**
+    * @param strictUpdateDelivery
+    * @throws Exception
+    * @throws MessagingException
+    */
+   private void setUp(boolean strictUpdateDelivery) throws Exception, MessagingException
+   {
+      Configuration config = createFileConfig();
+      config.setJournalFileSize(10 * 1024);
+      config.setJournalMinFiles(2);
+      config.setSecurityEnabled(false);
+      config.setStrictUpdateDelivery(strictUpdateDelivery);
+
+      messagingService = createService(true, config);
+      messagingService.start();
+
+      factory = createInVMFactory();
+
+      ClientSession session = factory.createSession(false, false, false);
+      session.createQueue(ADDRESS, ADDRESS, true);
+
+      session.close();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      if (messagingService != null && messagingService.isStarted())
+      {
+         messagingService.stop();
+      }
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}




More information about the jboss-cvs-commits mailing list