[jboss-cvs] JBoss Messaging SVN: r5885 - in trunk: src/main/org/jboss/messaging/core/config and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Feb 17 20:39:27 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-02-17 20:39:27 -0500 (Tue, 17 Feb 2009)
New Revision: 5885
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/schemas/jbm-configuration.xsd
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
Log:
JBMESSAGING-1339 and JBMESSAGING-1294 - Delivery Counter fixes
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/config/jbm-configuration.xml 2009-02-18 01:39:27 UTC (rev 5885)
@@ -48,6 +48,8 @@
<backup>false</backup>
+ <strict-update-delivery>false</strict-update-delivery>
+
<!--
<backup-connector-ref connector-name="netty-backup"/>
-->
@@ -180,4 +182,4 @@
</configuration>
-</deployment>
\ No newline at end of file
+</deployment>
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -51,14 +51,18 @@
void setClustered(boolean clustered);
+ boolean isStrictUpdateDelivery();
+
+ void setStrictUpdateDelivery(boolean strictUpdateDelivery);
+
boolean isBackup();
void setBackup(boolean backup);
-
+
long getQueueActivationTimeout();
-
+
void setQueueActivationTimeout(long timeout);
-
+
int getScheduledThreadPoolMaxSize();
void setScheduledThreadPoolMaxSize(int maxSize);
@@ -84,9 +88,9 @@
void setInterceptorClassNames(List<String> interceptors);
long getConnectionScanPeriod();
-
+
void setConnectionScanPeriod(long scanPeriod);
-
+
long getConnectionTTLOverride();
void setConnectionTTLOverride(long ttl);
@@ -94,58 +98,57 @@
Set<TransportConfiguration> getAcceptorConfigurations();
void setAcceptorConfigurations(Set<TransportConfiguration> infos);
-
+
Map<String, TransportConfiguration> getConnectorConfigurations();
- void setConnectorConfigurations(Map<String, TransportConfiguration> infos);
+ void setConnectorConfigurations(Map<String, TransportConfiguration> infos);
String getBackupConnectorName();
void setBackupConnectorName(String name);
-
+
List<BroadcastGroupConfiguration> getBroadcastGroupConfigurations();
-
+
void setBroadcastGroupConfigurations(List<BroadcastGroupConfiguration> configs);
-
+
Map<String, DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations();
-
+
void setDiscoveryGroupConfigurations(Map<String, DiscoveryGroupConfiguration> configs);
-
+
List<BridgeConfiguration> getBridgeConfigurations();
void setBridgeConfigurations(final List<BridgeConfiguration> configs);
-
+
List<DivertConfiguration> getDivertConfigurations();
void setDivertConfigurations(final List<DivertConfiguration> configs);
-
+
List<ClusterConnectionConfiguration> getClusterConfigurations();
void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs);
-
+
List<QueueConfiguration> getQueueConfigurations();
void setQueueConfigurations(final List<QueueConfiguration> configs);
-
+
SimpleString getManagementAddress();
-
+
void setManagementAddress(SimpleString address);
SimpleString getManagementNotificationAddress();
-
+
String getManagementClusterPassword();
long getManagementRequestTimeout();
int getIDCacheSize();
-
+
void setIDCacheSize(int idCacheSize);
-
+
boolean isPersistIDCache();
-
+
void setPersistIDCache(boolean persist);
-
// Journal related attributes ------------------------------------------------------------
String getBindingsDirectory();
@@ -192,13 +195,12 @@
void setCreateJournalDir(boolean create);
-
// Paging Properties --------------------------------------------------------------------
-
+
int getPagingMaxThreads();
-
+
void setPagingMaxThread(int pagingMaxThreads);
-
+
String getPagingDirectory();
void setPagingDirectory(String dir);
@@ -206,15 +208,15 @@
long getPagingMaxGlobalSizeBytes();
void setPagingMaxGlobalSizeBytes(long maxGlobalSize);
-
+
long getPagingDefaultSize();
-
+
void setPagingDefaultSize(long pageSize);
-
+
// Large Messages Properties ------------------------------------------------------------
-
+
String getLargeMessagesDirectory();
-
+
void setLargeMessagesDirectory(String directory);
boolean isWildcardRoutingEnabled();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -43,8 +43,10 @@
public static final boolean DEFAULT_CLUSTERED = false;
+ public static final boolean DEFAULT_STRICT_UPDATE_DELIVERY = false;
+
public static final boolean DEFAULT_BACKUP = false;
-
+
public static final long DEFAULT_QUEUE_ACTIVATION_TIMEOUT = 30000;
public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 30;
@@ -58,7 +60,7 @@
public static final boolean DEFAULT_JMX_MANAGEMENT_ENABLED = true;
public static final long DEFAULT_CONNECTION_SCAN_PERIOD = 1000;
-
+
public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
@@ -68,13 +70,13 @@
public static final String DEFAULT_JOURNAL_DIR = "data/journal";
public static final String DEFAULT_PAGING_DIR = "data/paging";
-
+
public static final int DEFAULT_PAGE_MAX_THREADS = 10;
-
+
public static final long DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
-
+
public static final long DEFAULT_PAGE_MAX_GLOBAL_SIZE = -1;
-
+
public static final String DEFAULT_LARGE_MESSAGES_DIR = "data/largemessages";
public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
@@ -100,19 +102,19 @@
public static final long DEFAULT_TRANSACTION_TIMEOUT = 60000;
public static final long DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD = 1000;
-
+
public static final SimpleString DEFAULT_MANAGEMENT_ADDRESS = new SimpleString("jbm.admin.management");
-
+
public static final SimpleString DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS = new SimpleString("jbm.admin.notification");
public static final String DEFAULT_MANAGEMENT_CLUSTER_PASSWORD = "CHANGE ME!!";
- public static final long DEFAULT_MANAGEMENT_REQUEST_TIMEOUT = 500;
-
+ public static final long DEFAULT_MANAGEMENT_REQUEST_TIMEOUT = 500;
+
public static final long DEFAULT_BROADCAST_PERIOD = 5000;
-
+
public static final long DEFAULT_BROADCAST_REFRESH_TIMEOUT = 10000;
-
+
public static final int DEFAULT_MAX_FORWARD_BATCH_SIZE = 1;
public static final long DEFAULT_MAX_FORWARD_BATCH_TIME = -1;
@@ -120,19 +122,19 @@
public static final long DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD = 30000;
public static final int DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY = 3;
-
+
public static final int DEFAULT_ID_CACHE_SIZE = 2000;
-
+
public static final boolean DEFAULT_PERSIST_ID_CACHE = true;
-
+
public static final boolean DEFAULT_CLUSTER_DUPLICATE_DETECTION = true;
-
+
public static final boolean DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS = false;
-
+
public static final int DEFAULT_CLUSTER_MAX_HOPS = 1;
-
+
public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
-
+
public static final boolean DEFAULT_BRIDGE_DUPLICATE_DETECTION = true;
// Attributes -----------------------------------------------------------------------------
@@ -141,6 +143,8 @@
protected boolean backup = DEFAULT_BACKUP;
+ protected boolean strictUpdateDelivery = DEFAULT_STRICT_UPDATE_DELIVERY;
+
protected long queueActivationTimeout = DEFAULT_QUEUE_ACTIVATION_TIMEOUT;
protected int scheduledThreadPoolMaxSize = DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
@@ -154,53 +158,51 @@
protected boolean jmxManagementEnabled = DEFAULT_JMX_MANAGEMENT_ENABLED;
protected long connectionScanPeriod = DEFAULT_CONNECTION_SCAN_PERIOD;
-
+
protected long connectionTTLOverride = DEFAULT_CONNECTION_TTL_OVERRIDE;
protected long messageExpiryScanPeriod = DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD;
protected int messageExpiryThreadPriority = DEFAULT_MESSAGE_EXPIRY_THREAD_PRIORITY;
-
+
protected int idCacheSize = DEFAULT_ID_CACHE_SIZE;
-
+
protected boolean persistIDCache = DEFAULT_PERSIST_ID_CACHE;
-
+
protected List<String> interceptorClassNames = new ArrayList<String>();
-
+
protected Map<String, TransportConfiguration> connectorConfigs = new HashMap<String, TransportConfiguration>();
protected Set<TransportConfiguration> acceptorConfigs = new HashSet<TransportConfiguration>();
protected String backupConnectorName;
-
+
protected List<BridgeConfiguration> bridgeConfigurations = new ArrayList<BridgeConfiguration>();
-
+
protected List<DivertConfiguration> divertConfigurations = new ArrayList<DivertConfiguration>();
-
+
protected List<ClusterConnectionConfiguration> clusterConfigurations = new ArrayList<ClusterConnectionConfiguration>();
-
+
protected List<QueueConfiguration> queueConfigurations = new ArrayList<QueueConfiguration>();
-
+
protected List<BroadcastGroupConfiguration> broadcastGroupConfigurations = new ArrayList<BroadcastGroupConfiguration>();
-
+
protected Map<String, DiscoveryGroupConfiguration> discoveryGroupConfigurations = new LinkedHashMap<String, DiscoveryGroupConfiguration>();
-
-
+
// Paging related attributes ------------------------------------------------------------
protected long pagingMaxGlobalSize = DEFAULT_PAGE_MAX_GLOBAL_SIZE;
-
+
protected long pagingDefaultSize = DEFAULT_PAGE_SIZE;
protected String pagingDirectory = DEFAULT_PAGING_DIR;
-
+
protected int pagingMaxThreads = DEFAULT_PAGE_MAX_THREADS;
-
// File related attributes -----------------------------------------------------------
protected String largeMessagesDirectory = DEFAULT_LARGE_MESSAGES_DIR;
-
+
protected String bindingsDirectory = DEFAULT_BINDINGS_DIRECTORY;
protected boolean createBindingsDir = DEFAULT_CREATE_BINDINGS_DIR;
@@ -230,15 +232,15 @@
protected long transactionTimeout = DEFAULT_TRANSACTION_TIMEOUT;
protected long transactionTimeoutScanPeriod = DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
-
+
protected SimpleString managementAddress = DEFAULT_MANAGEMENT_ADDRESS;
protected SimpleString managementNotificationAddress = DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
-
+
protected String managementClusterPassword = DEFAULT_MANAGEMENT_CLUSTER_PASSWORD;
protected long managementRequestTimeout = DEFAULT_MANAGEMENT_REQUEST_TIMEOUT;
-
+
public boolean isClustered()
{
return clustered;
@@ -254,6 +256,22 @@
return backup;
}
+ /**
+ * @return the strictUpdateDelivery
+ */
+ public boolean isStrictUpdateDelivery()
+ {
+ return strictUpdateDelivery;
+ }
+
+ /**
+ * @param strictJMS the strictJMS to set
+ */
+ public void setStrictUpdateDelivery(final boolean strictUpdateDelivery)
+ {
+ this.strictUpdateDelivery = strictUpdateDelivery;
+ }
+
public void setBackup(final boolean backup)
{
this.backup = backup;
@@ -264,9 +282,9 @@
return queueActivationTimeout;
}
- public void setQueueActivationTimeout(long timeout)
+ public void setQueueActivationTimeout(final long timeout)
{
- this.queueActivationTimeout = timeout;
+ queueActivationTimeout = timeout;
}
public int getScheduledThreadPoolMaxSize()
@@ -308,7 +326,7 @@
{
connectionScanPeriod = scanPeriod;
}
-
+
public long getConnectionTTLOverride()
{
return connectionTTLOverride;
@@ -316,7 +334,7 @@
public void setConnectionTTLOverride(final long ttl)
{
- this.connectionTTLOverride = ttl;
+ connectionTTLOverride = ttl;
}
public List<String> getInterceptorClassNames()
@@ -338,7 +356,7 @@
{
acceptorConfigs = infos;
}
-
+
public Map<String, TransportConfiguration> getConnectorConfigurations()
{
return connectorConfigs;
@@ -346,7 +364,7 @@
public void setConnectorConfigurations(final Map<String, TransportConfiguration> infos)
{
- this.connectorConfigs = infos;
+ connectorConfigs = infos;
}
public String getBackupConnectorName()
@@ -358,55 +376,55 @@
{
this.backupConnectorName = backupConnectorName;
}
-
+
public List<BridgeConfiguration> getBridgeConfigurations()
{
return bridgeConfigurations;
}
-
+
public void setBridgeConfigurations(final List<BridgeConfiguration> configs)
{
- this.bridgeConfigurations = configs;
+ bridgeConfigurations = configs;
}
public List<BroadcastGroupConfiguration> getBroadcastGroupConfigurations()
{
- return this.broadcastGroupConfigurations;
+ return broadcastGroupConfigurations;
}
-
+
public void setBroadcastGroupConfigurations(final List<BroadcastGroupConfiguration> configs)
{
- this.broadcastGroupConfigurations = configs;
+ broadcastGroupConfigurations = configs;
}
public List<ClusterConnectionConfiguration> getClusterConfigurations()
{
- return this.clusterConfigurations;
+ return clusterConfigurations;
}
-
+
public void setClusterConfigurations(final List<ClusterConnectionConfiguration> configs)
{
- this.clusterConfigurations = configs;
+ clusterConfigurations = configs;
}
public List<DivertConfiguration> getDivertConfigurations()
{
- return this.divertConfigurations;
+ return divertConfigurations;
}
-
+
public void setDivertConfigurations(final List<DivertConfiguration> configs)
{
- this.divertConfigurations = configs;
+ divertConfigurations = configs;
}
public List<QueueConfiguration> getQueueConfigurations()
{
- return this.queueConfigurations;
+ return queueConfigurations;
}
public void setQueueConfigurations(final List<QueueConfiguration> configs)
{
- this.queueConfigurations = configs;
+ queueConfigurations = configs;
}
public Map<String, DiscoveryGroupConfiguration> getDiscoveryGroupConfigurations()
@@ -418,25 +436,25 @@
{
this.discoveryGroupConfigurations = discoveryGroupConfigurations;
}
-
+
public int getIDCacheSize()
{
return idCacheSize;
}
-
+
public void setIDCacheSize(final int idCacheSize)
{
this.idCacheSize = idCacheSize;
}
-
+
public boolean isPersistIDCache()
{
return persistIDCache;
}
-
+
public void setPersistIDCache(final boolean persist)
{
- this.persistIDCache = persist;
+ persistIDCache = persist;
}
public String getBindingsDirectory()
@@ -463,17 +481,17 @@
{
return journalType;
}
-
+
public int getPagingMaxThreads()
{
return pagingMaxThreads;
}
-
+
public void setPagingMaxThread(final int pagingMaxThreads)
{
this.pagingMaxThreads = pagingMaxThreads;
}
-
+
public void setPagingDirectory(final String dir)
{
pagingDirectory = dir;
@@ -564,9 +582,9 @@
return wildcardRoutingEnabled;
}
- public void setWildcardRoutingEnabled(boolean enabled)
+ public void setWildcardRoutingEnabled(final boolean enabled)
{
- this.wildcardRoutingEnabled = enabled;
+ wildcardRoutingEnabled = enabled;
}
public long getTransactionTimeout()
@@ -574,7 +592,7 @@
return transactionTimeout;
}
- public void setTransactionTimeout(long timeout)
+ public void setTransactionTimeout(final long timeout)
{
transactionTimeout = timeout;
}
@@ -584,7 +602,7 @@
return transactionTimeoutScanPeriod;
}
- public void setTransactionTimeoutScanPeriod(long period)
+ public void setTransactionTimeoutScanPeriod(final long period)
{
transactionTimeoutScanPeriod = period;
}
@@ -594,7 +612,7 @@
return messageExpiryScanPeriod;
}
- public void setMessageExpiryScanPeriod(long messageExpiryScanPeriod)
+ public void setMessageExpiryScanPeriod(final long messageExpiryScanPeriod)
{
this.messageExpiryScanPeriod = messageExpiryScanPeriod;
}
@@ -604,7 +622,7 @@
return messageExpiryThreadPriority;
}
- public void setMessageExpiryThreadPriority(int messageExpiryThreadPriority)
+ public void setMessageExpiryThreadPriority(final int messageExpiryThreadPriority)
{
this.messageExpiryThreadPriority = messageExpiryThreadPriority;
}
@@ -648,7 +666,7 @@
{
pagingMaxGlobalSize = maxGlobalSize;
}
-
+
/* (non-Javadoc)
* @see org.jboss.messaging.core.config.Configuration#getPagingDefaultSize()
*/
@@ -660,64 +678,62 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.config.Configuration#setPagingDefaultSize(long)
*/
- public void setPagingDefaultSize(long pageSize)
+ public void setPagingDefaultSize(final long pageSize)
{
- this.pagingDefaultSize = pageSize;
+ pagingDefaultSize = pageSize;
}
-
-
+
public String getLargeMessagesDirectory()
{
return largeMessagesDirectory;
}
-
+
public void setLargeMessagesDirectory(final String directory)
{
- this.largeMessagesDirectory = directory;
+ largeMessagesDirectory = directory;
}
-
public boolean isMessageCounterEnabled()
{
return messageCounterEnabled;
}
-
+
public SimpleString getManagementAddress()
{
return managementAddress;
}
-
- public void setManagementAddress(SimpleString address)
+
+ public void setManagementAddress(final SimpleString address)
{
- this.managementAddress = address;
+ managementAddress = address;
}
-
+
public SimpleString getManagementNotificationAddress()
{
- return managementNotificationAddress ;
+ return managementNotificationAddress;
}
-
- public void setManagementNotificationAddress(SimpleString address)
+
+ public void setManagementNotificationAddress(final SimpleString address)
{
- this.managementNotificationAddress = address;
+ managementNotificationAddress = address;
}
-
+
public String getManagementClusterPassword()
{
return managementClusterPassword;
}
-
- public void setManagementClusterPassword(String clusterPassword)
+
+ public void setManagementClusterPassword(final String clusterPassword)
{
- this.managementClusterPassword = clusterPassword;
+ managementClusterPassword = clusterPassword;
}
-
+
public long getManagementRequestTimeout()
{
return managementRequestTimeout;
}
-
- public void setManagementRequestTimeout(long managementRequestTimeout)
+
+ public void setManagementRequestTimeout(final long managementRequestTimeout)
{
this.managementRequestTimeout = managementRequestTimeout;
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -89,6 +89,8 @@
backup = getBoolean(e, "backup", backup);
+ strictUpdateDelivery = getBoolean(e, "strict-update-delivery", strictUpdateDelivery);
+
queueActivationTimeout = getLong(e, "queue-activation-timeout", queueActivationTimeout);
// NOTE! All the defaults come from the super class
@@ -121,12 +123,14 @@
managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString()));
- managementNotificationAddress = new SimpleString(getString(e, "management-notification-address", managementNotificationAddress.toString()));
+ managementNotificationAddress = new SimpleString(getString(e,
+ "management-notification-address",
+ managementNotificationAddress.toString()));
managementClusterPassword = getString(e, "management-cluster-password", managementClusterPassword.toString());
managementRequestTimeout = getLong(e, "management-request-timeout", managementRequestTimeout);
-
+
NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
ArrayList<String> interceptorList = new ArrayList<String>();
@@ -263,7 +267,7 @@
String s = getString(e, "journal-type", journalType.toString());
- if (s == null || (!s.equals(JournalType.NIO.toString()) && !s.equals(JournalType.ASYNCIO.toString())))
+ if (s == null || !s.equals(JournalType.NIO.toString()) && !s.equals(JournalType.ASYNCIO.toString()))
{
throw new IllegalArgumentException("Invalid journal type " + s);
}
@@ -299,14 +303,14 @@
return configurationUrl;
}
- public void setConfigurationUrl(String configurationUrl)
+ public void setConfigurationUrl(final String configurationUrl)
{
this.configurationUrl = configurationUrl;
}
// Private -------------------------------------------------------------------------
- private Boolean getBoolean(Element e, String name, Boolean def)
+ private Boolean getBoolean(final Element e, final String name, final Boolean def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
@@ -316,7 +320,7 @@
return def;
}
- private Integer getInteger(Element e, String name, Integer def)
+ private Integer getInteger(final Element e, final String name, final Integer def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
@@ -326,7 +330,7 @@
return def;
}
- private Long getLong(Element e, String name, Long def)
+ private Long getLong(final Element e, final String name, final Long def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
@@ -336,7 +340,7 @@
return def;
}
- private String getString(Element e, String name, String def)
+ private String getString(final Element e, final String name, final String def)
{
NodeList nl = e.getElementsByTagName(name);
if (nl.getLength() > 0)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -951,6 +951,7 @@
autoCommitSends,
autoCommitAcks,
preAcknowledge,
+ configuration.isStrictUpdateDelivery(),
xa,
connection,
storageManager,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -120,6 +120,8 @@
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
private final boolean browseOnly;
+
+ private final boolean updateDeliveries;
private final StorageManager storageManager;
@@ -149,6 +151,7 @@
final PagingManager pagingManager,
final Channel channel,
final boolean preAcknowledge,
+ final boolean updateDeliveries,
final Executor executor,
final ManagementService managementService) throws Exception
{
@@ -181,6 +184,8 @@
binding.getQueue().addConsumer(this);
minLargeMessageSize = session.getMinLargeMessageSize();
+
+ this.updateDeliveries = updateDeliveries;
}
// ServerConsumer implementation
@@ -401,10 +406,6 @@
else
{
ref.getQueue().acknowledge(tx, ref);
-
- // Del count is not actually updated in storage unless it's
- // cancelled
- ref.incrementDeliveryCount();
}
}
while (ref.getMessage().getMessageID() != messageID);
@@ -651,6 +652,18 @@
deliverStandardMessage(ref, message);
}
+ ref.incrementDeliveryCount();
+
+ // If updateDeliveries = false (set by strict-update),
+ // the updateDeliveryCount would still be updated after cancel
+ if (updateDeliveries)
+ {
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ {
+ storageManager.updateDeliveryCount(ref);
+ }
+ }
+
return HandleStatus.HANDLED;
}
finally
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -138,6 +138,8 @@
private final boolean autoCommitAcks;
private final boolean preAcknowledge;
+
+ private final boolean updateDeliveries;
private volatile RemotingConnection remotingConnection;
@@ -195,6 +197,7 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
+ final boolean updateDeliveries,
final boolean xa,
final RemotingConnection remotingConnection,
final StorageManager storageManager,
@@ -238,6 +241,8 @@
{
tx = new TransactionImpl(storageManager);
}
+
+ this.updateDeliveries = updateDeliveries;
this.channel = channel;
@@ -1412,6 +1417,7 @@
postOffice.getPagingManager(),
channel,
preAcknowledge,
+ updateDeliveries,
executor,
managementService);
Modified: trunk/src/schemas/jbm-configuration.xsd
===================================================================
--- trunk/src/schemas/jbm-configuration.xsd 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/src/schemas/jbm-configuration.xsd 2009-02-18 01:39:27 UTC (rev 5885)
@@ -96,6 +96,9 @@
<xsd:element name="backup" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>
+ <xsd:element name="strict-update-delivery" type="xsd:boolean"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
<xsd:element name="backup-connector-ref"
type="backup-connectorType" maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2009-02-17 19:52:51 UTC (rev 5884)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -3764,99 +3764,99 @@
}
// http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
-// public void testExceptionMessageListener1() throws Exception
-// {
-// Connection conn = null;
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// conn.start();
-//
-// Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
-//
-// cons.setMessageListener(listener);
-//
-// MessageProducer prod = sessSend.createProducer(queue1);
-// TextMessage m1 = sess.createTextMessage("a");
-// TextMessage m2 = sess.createTextMessage("b");
-// TextMessage m3 = sess.createTextMessage("c");
-//
-// prod.send(m1);
-// prod.send(m2);
-// prod.send(m3);
-//
-// listener.waitForMessages();
-//
-// assertFalse(listener.message, listener.failed);
-//
-// conn.close();
-//
-// conn = null;
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
-//
-// public void testExceptionMessageListener2() throws Exception
-// {
-// Connection conn = null;
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// conn.start();
-//
-// Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
-//
-// cons.setMessageListener(listener);
-//
-// MessageProducer prod = sessSend.createProducer(queue1);
-// TextMessage m1 = sess.createTextMessage("a");
-// TextMessage m2 = sess.createTextMessage("b");
-// TextMessage m3 = sess.createTextMessage("c");
-//
-// prod.send(m1);
-// prod.send(m2);
-// prod.send(m3);
-//
-// listener.waitForMessages();
-//
-// assertFalse(listener.message, listener.failed);
-//
-//
-// conn.close();
-//
-// conn = null;
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
+ public void testExceptionMessageListener1() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+ conn.start();
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
+
+ cons.setMessageListener(listener);
+
+ MessageProducer prod = sessSend.createProducer(queue1);
+ TextMessage m1 = sess.createTextMessage("a");
+ TextMessage m2 = sess.createTextMessage("b");
+ TextMessage m3 = sess.createTextMessage("c");
+
+ prod.send(m1);
+ prod.send(m2);
+ prod.send(m3);
+
+ listener.waitForMessages();
+
+ assertFalse(listener.message, listener.failed);
+
+ conn.close();
+
+ conn = null;
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testExceptionMessageListener2() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ ExceptionRedelMessageListenerImpl listener = new ExceptionRedelMessageListenerImpl(sess);
+
+ cons.setMessageListener(listener);
+
+ MessageProducer prod = sessSend.createProducer(queue1);
+ TextMessage m1 = sess.createTextMessage("a");
+ TextMessage m2 = sess.createTextMessage("b");
+ TextMessage m3 = sess.createTextMessage("c");
+
+ prod.send(m1);
+ prod.send(m2);
+ prod.send(m3);
+
+ listener.waitForMessages();
+
+ assertFalse(listener.message, listener.failed);
+
+
+ conn.close();
+
+ conn = null;
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
public void testExceptionMessageListener3() throws Exception
{
Connection conn = null;
Added: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/RedeliveryConsumerTest.java 2009-02-18 01:39:27 UTC (rev 5885)
@@ -0,0 +1,164 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.consumer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A RedeliveryConsumerTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Feb 17, 2009 6:06:11 PM
+ *
+ *
+ */
+public class RedeliveryConsumerTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ MessagingService messagingService;
+
+ final SimpleString ADDRESS = new SimpleString("address");
+
+ ClientSessionFactory factory;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testRedeliveryMessageStrict() throws Exception
+ {
+ testDedeliveryMessageOnPersistent(true);
+ }
+
+ public void testRedeliveryMessageSimpleCancel() throws Exception
+ {
+ testDedeliveryMessageOnPersistent(false);
+ }
+
+ protected void testDedeliveryMessageOnPersistent(boolean strictUpdate) throws Exception
+ {
+ setUp(strictUpdate);
+ ClientSession session = factory.createSession(false, true, false);
+ ClientProducer prod = session.createProducer(ADDRESS);
+ prod.send(createTextMessage(session, "Hello"));
+ session.commit();
+ session.close();
+
+ messagingService.stop();
+ messagingService.start();
+
+ session = factory.createSession(false, true, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = consumer.receive(1000);
+ assertEquals(1, msg.getDeliveryCount());
+ assertNotNull(msg);
+ session.stop();
+
+ // if strictUpdate == true, this will simulating a crash, where the server is stopped without closing/rolling back the session
+ if (!strictUpdate)
+ {
+ // If non Strict, at least rollback/cancel should still update the delivery-counts
+ session.rollback();
+ session.close();
+ }
+
+ messagingService.stop();
+
+ messagingService.start();
+
+ session = factory.createSession(false, true, false);
+ session.start();
+ consumer = session.createConsumer(ADDRESS);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(2, msg.getDeliveryCount());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ /**
+ * @param strictUpdateDelivery
+ * @throws Exception
+ * @throws MessagingException
+ */
+ private void setUp(boolean strictUpdateDelivery) throws Exception, MessagingException
+ {
+ Configuration config = createFileConfig();
+ config.setJournalFileSize(10 * 1024);
+ config.setJournalMinFiles(2);
+ config.setSecurityEnabled(false);
+ config.setStrictUpdateDelivery(strictUpdateDelivery);
+
+ messagingService = createService(true, config);
+ messagingService.start();
+
+ factory = createInVMFactory();
+
+ ClientSession session = factory.createSession(false, false, false);
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.close();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ if (messagingService != null && messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list