[jboss-cvs] JBoss Messaging SVN: r2572 - in branches/Branch_1_2_0_SP: docs/examples/ejb3mdb and 15 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 11 12:59:32 EDT 2007
Author: timfox
Date: 2007-04-11 12:59:31 -0400 (Wed, 11 Apr 2007)
New Revision: 2572
Added:
branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/
branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/DataFile.java
branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/
branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
Removed:
branches/Branch_1_2_0_SP/src/etc/server/default/deploy/bridge-service.xml
branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/DataFile.java
branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
Modified:
branches/Branch_1_2_0_SP/build-messaging.xml
branches/Branch_1_2_0_SP/docs/examples/ejb3mdb/do-not-distribute.properties
branches/Branch_1_2_0_SP/docs/userguide/en/modules/performance.xml
branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml
branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml
branches/Branch_1_2_0_SP/src/etc/server/default/deploy/mssql-persistence-service.xml
branches/Branch_1_2_0_SP/src/etc/server/default/deploy/sybase-persistence-service.xml
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/FailoverCommandCenter.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ClusteringAspect.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/Version.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/jms/QueueTest.java
Log:
Merged from TRUNK->Branch_1_2_0_SP
Modified: branches/Branch_1_2_0_SP/build-messaging.xml
===================================================================
--- branches/Branch_1_2_0_SP/build-messaging.xml 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/build-messaging.xml 2007-04-11 16:59:31 UTC (rev 2572)
@@ -48,10 +48,10 @@
<property name="messaging.version.major" value="1"/>
<property name="messaging.version.minor" value="2"/>
<property name="messaging.version.revision" value="0"/>
- <property name="messaging.version.incrementing" value="12"/>
- <property name="messaging.version.tag" value="SP1"/>
+ <property name="messaging.version.incrementing" value="13"/>
+ <property name="messaging.version.tag" value="SP2"/>
<property name="messaging.version.name" value="Alkali"/>
- <property name="messaging.version.cvstag" value="JBossMessaging_1_2_0_SP1"/>
+ <property name="messaging.version.cvstag" value="JBossMessaging_1_2_0_SP2"/>
<property name="module.name" value="messaging"/>
<property name="module.Name" value="JBoss Messaging"/>
<property name="module.version" value="${messaging.version.major}.${messaging.version.minor}.${messaging.version.revision}.${messaging.version.tag}"/>
Modified: branches/Branch_1_2_0_SP/docs/examples/ejb3mdb/do-not-distribute.properties
===================================================================
--- branches/Branch_1_2_0_SP/docs/examples/ejb3mdb/do-not-distribute.properties 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/docs/examples/ejb3mdb/do-not-distribute.properties 2007-04-11 16:59:31 UTC (rev 2572)
@@ -5,4 +5,4 @@
messaging.client.jar.path=../../../output/lib
messaging.client.jar.name=jboss-messaging-client.jar
jboss.configuration=messaging
-jboss.home=C:\\work\\src\\jboss-4.0.5.GA-src\\build\\output\\jboss-4.0.5.GA-ejb3
+jboss.home=/extra/clebert/jboss/jboss-install
Modified: branches/Branch_1_2_0_SP/docs/userguide/en/modules/performance.xml
===================================================================
--- branches/Branch_1_2_0_SP/docs/userguide/en/modules/performance.xml 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/docs/userguide/en/modules/performance.xml 2007-04-11 16:59:31 UTC (rev 2572)
@@ -61,7 +61,7 @@
</programlisting>
</section>
-sssssssssssd
+
<section>
<title>Setup the Tests</title>
Deleted: branches/Branch_1_2_0_SP/src/etc/server/default/deploy/bridge-service.xml
===================================================================
--- branches/Branch_1_2_0_SP/src/etc/server/default/deploy/bridge-service.xml 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/etc/server/default/deploy/bridge-service.xml 2007-04-11 16:59:31 UTC (rev 2572)
@@ -1,63 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- Example Message Bridge configurations
-
- $Id: destinations-service.xml 1930 2007-01-09 18:16:04Z timfox $
- -->
-
-<server>
-
- <mbean code="org.jboss.jms.server.bridge.BridgeService"
- name="jboss.messaging:service=Bridge,name=exampleBridge"
- xmbean-dd="xmdesc/Bridge-xmbean.xml">
-
- <attribute name="SourceConnectionFactoryLookup">/ConnectionFactory</attribute>
-
- <attribute name="TargetConnectionFactoryLookup">/ConnectionFactory</attribute>
-
- <attribute name="SourceDestinationLookup">/topic/sourceTopic</attribute>
-
- <attribute name="TargetDestinationLookup">/queue/targetQueue</attribute>
-
- <attribute name="SourceUsername">bob</attribute>
-
- <attribute name="SourcePassword">pwd1</attribute>
-
- <attribute name="TargetUsername">jane</attribute>
-
- <attribute name="TargetPassword">pwd2</attribute>
-
- <attribute name="QualityOfServiceMode">2</attribute>
-
- <attribute name="Selector">vegetable='marrow'</attribute>
-
- <attribute name="MaxBatchSize">100</attribute>
-
- <attribute name="MaxBatchTime">5000</attribute>
-
- <attribute name="SubName">mySubscription</attribute>
-
- <attribute name="ClientID">clientid-123</attribute>
-
- <attribute name="FailureRetryInterval">5000</attribute>
-
- <attribute name="MaxRetries">-1</attribute>
-
- <attribute name="SourceJNDIProperties"><![CDATA[
-java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
-java.naming.provider.url=jnp://server1:1099
-java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
- ]]>
- </attribute>
-
- <attribute name="TargetJNDIProperties"><![CDATA[
-java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
-java.naming.provider.url=jnp://server2:1099
-java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
- ]]>
- </attribute>
- </mbean>
-
-
-</server>
\ No newline at end of file
Modified: branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml
===================================================================
--- branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-mssql-persistence-service.xml 2007-04-11 16:59:31 UTC (rev 2572)
@@ -75,10 +75,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID INTEGER, CLUSTERED CHAR(1))
-INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID INTEGER, CLSTERED CHAR(1))
+INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
<attribute name="GroupName">DefaultPostOffice</attribute>
<attribute name="StateTimeout">5000</attribute>
Modified: branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml
===================================================================
--- branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/etc/server/default/deploy/clustered-sybase-persistence-service.xml 2007-04-11 16:59:31 UTC (rev 2572)
@@ -76,10 +76,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023) NULL, CHANNEL_ID INTEGER, CLUSTERED CHAR(1))
-INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023) NULL, CHANNEL_ID INTEGER, CLSTERED CHAR(1))
+INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
<attribute name="GroupName">DefaultPostOffice</attribute>
<attribute name="StateTimeout">5000</attribute>
Modified: branches/Branch_1_2_0_SP/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_1_2_0_SP/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-04-11 16:59:31 UTC (rev 2572)
@@ -75,10 +75,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID INTEGER, CLUSTERED CHAR(1))
-INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID INTEGER, CLSTERED CHAR(1))
+INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
</mbean>
Modified: branches/Branch_1_2_0_SP/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_1_2_0_SP/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-04-11 16:59:31 UTC (rev 2572)
@@ -76,10 +76,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023) NULL, CHANNEL_ID INTEGER, CLUSTERED CHAR(1))
-INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JBM_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID SMALLINT, QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023) NULL, CHANNEL_ID INTEGER, CLSTERED CHAR(1))
+INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED) VALUES (?, ?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
</mbean>
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/FailoverCommandCenter.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/FailoverCommandCenter.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -63,9 +63,12 @@
/**
* Method called by failure detection components (FailoverValveInterceptors and
* ConnectionListeners) when they have reasons to believe that a server failure occured.
+ *
+ * Returns true if the failover command centre handled the exception gracefully and failover completed
+ * or false if it didn't and failover did not occur
*/
- public void failureDetected(Throwable reason, FailureDetector source,
- JMSRemotingConnection remotingConnection)
+ public boolean failureDetected(Throwable reason, FailureDetector source,
+ JMSRemotingConnection remotingConnection)
throws Exception
{
log.debug("failure detected by " + source);
@@ -94,7 +97,9 @@
{
log.debug(this + " ignoring failure detection notification, as failover was " +
"already (or is in process of being) performed on this connection");
- return;
+
+ //Return true since failover already completed ok
+ return true;
}
remotingConnection.setFailed();
@@ -118,9 +123,7 @@
if (res == null)
{
- // No failover attempt was detected on the server side; this might happen if the
- // client side network fails temporarily so the client connection breaks but the
- // server cluster is still up and running - in this case we don't perform failover.
+ // Failover did not occur
failoverSuccessful = false;
}
else
@@ -144,6 +147,8 @@
failoverSuccessful = true;
}
+
+ return failoverSuccessful;
}
catch (Exception e)
{
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ClusteringAspect.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ClusteringAspect.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -99,7 +99,7 @@
while (attemptCount < MAX_RECONNECT_HOP_COUNT)
{
- // since an exceptiong might be captured during an attempt, this has to be the first
+ // since an exception might be captured during an attempt, this has to be the first
// operation
attemptCount++;
try
@@ -142,7 +142,7 @@
// add a connection listener to detect failure; the consolidated remoting connection
// listener must be already in place and configured
state.getRemotingConnection().getConnectionListener().
- addDelegateListener(new ConnectionFailureListener(fcc, state.getRemotingConnection()));
+ setDelegateListener(new ConnectionFailureListener(fcc, state.getRemotingConnection()));
log.debug(this + " installed failure listener on " + cd);
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/container/ConnectionFailureListener.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -6,23 +6,23 @@
*/
package org.jboss.jms.client.container;
-import org.jboss.remoting.ConnectionListener;
-import org.jboss.remoting.Client;
-import org.jboss.logging.Logger;
import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.FailureDetector;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
/**
* The listener that detects a connection failure and initiates the failover process. Each physical
* connection created under the supervision of ClusteredAspect has one of these.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
* $Id$
*/
-public class ConnectionFailureListener implements ConnectionListener, FailureDetector
+public class ConnectionFailureListener implements FailureDetector
{
// Constants ------------------------------------------------------------------------------------
@@ -48,18 +48,24 @@
// ConnectionListener implementation ------------------------------------------------------------
- public void handleConnectionException(Throwable throwable, Client client)
+ /*
+ * Returns true if failover handled the exception gracefully
+ * Returns false if failover was unable to handle the exception and it should be passed
+ * on to any JMS exception listener
+ */
+ public boolean handleConnectionException(Throwable throwable, Client client)
{
try
{
log.debug(this + " is being notified of connection failure: " + throwable);
- fcc.failureDetected(throwable, this, remotingConnection);
-
+ return fcc.failureDetected(throwable, this, remotingConnection);
}
catch (Throwable e)
{
log.error("Caught exception in handling failure", e);
+
+ return false;
}
}
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/ConsolidatedRemotingConnectionListener.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -6,17 +6,15 @@
*/
package org.jboss.jms.client.remoting;
-import org.jboss.remoting.ConnectionListener;
-import org.jboss.remoting.Client;
-import org.jboss.logging.Logger;
-import org.jboss.jms.client.state.ConnectionState;
-
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
+import org.jboss.jms.client.container.ConnectionFailureListener;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.ConnectionListener;
+
/**
* The ONLY remoting connection listener for a JMS connection's underlying remoting connection.
* Added to the remoting connection when the JMS connection is created, and removed when the
@@ -44,51 +42,56 @@
private ExceptionListener jmsExceptionListener;
// List<ConnectionListener>
- private List delegateListeners;
+ //private List delegateListeners;
+
+ private ConnectionFailureListener remotingListener;
// Constructors ---------------------------------------------------------------------------------
public ConsolidatedRemotingConnectionListener()
{
- delegateListeners = new ArrayList();
}
// ConnectionListener implementation ------------------------------------------------------------
public void handleConnectionException(Throwable throwable, Client client)
{
- // forward the exception to delegate listeners and JMS ExceptionListeners; synchronize and
- // copy to avoid race conditions
+ // forward the exception to delegate listener and JMS ExceptionListeners; synchronize
+ // to avoid race conditions
ExceptionListener jmsExceptionListenerCopy;
- List delegateListenersCopy = new ArrayList();
+
+ ConnectionFailureListener remotingListenerCopy;
synchronized(this)
{
jmsExceptionListenerCopy = jmsExceptionListener;
- for(Iterator i = delegateListeners.iterator(); i.hasNext(); )
- {
- delegateListenersCopy.add(i.next());
- }
+ remotingListenerCopy = remotingListener;
}
+
+ boolean forwardToJMSListener = true;
- for(Iterator i = delegateListenersCopy.iterator(); i.hasNext(); )
+ if (remotingListenerCopy != null)
{
- ConnectionListener l = (ConnectionListener)i.next();
-
try
{
- log.debug(this + " forwarding remoting failure \"" + throwable + "\" to " + l);
- l.handleConnectionException(throwable, client);
+ log.debug(this + " forwarding remoting failure \"" + throwable + "\" to " + remotingListenerCopy);
+
+ //We only forward to the JMS listener if failover did not successfully handle the exception
+ //If failover handled the exception transparently then there is effectively no problem
+ //with the logical connection that the client needs to be aware of
+ forwardToJMSListener = !remotingListenerCopy.handleConnectionException(throwable, client);
}
catch(Exception e)
{
- log.warn("Failed to forward " + throwable + " to " + l, e);
+ log.warn("Failed to forward " + throwable + " to " + remotingListenerCopy, e);
}
}
-
- if (jmsExceptionListenerCopy != null)
+
+ log.info("DISPATCHING TO JMSLISTENER " + forwardToJMSListener);
+
+ if (forwardToJMSListener && jmsExceptionListenerCopy != null)
{
JMSException jmsException = null;
@@ -118,10 +121,16 @@
// Public ---------------------------------------------------------------------------------------
- public synchronized boolean addDelegateListener(ConnectionListener l)
+ public synchronized void setDelegateListener(ConnectionFailureListener l)
{
- log.debug(this + " adding delegate listener " + l);
- return delegateListeners.add(l);
+ log.debug(this + " setting delegate listener " + l);
+
+ if (remotingListener != null)
+ {
+ throw new IllegalStateException("There is already a connection listener for the connection");
+ }
+
+ remotingListener = l;
}
public synchronized void addJMSExceptionListener(ExceptionListener jmsExceptionListener)
@@ -141,7 +150,7 @@
public synchronized void clear()
{
jmsExceptionListener = null;
- delegateListeners.clear();
+ remotingListener = null;
log.debug(this + " cleared");
}
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -343,6 +343,11 @@
{
// very unlikely to get an exception on a local remove (I suspect badly designed API),
// but we're failed anyway, so we don't care too much
+
+ // Actually an exception will always be thrown here if the failure was detected by the connection
+ // validator since the validator will disconnect the client before calling the connection
+ // listener.
+
log.debug(this + " failed to cleanly remove callback manager from the client", t);
}
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/ServerPeer.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/ServerPeer.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -72,7 +72,6 @@
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.Util;
import org.jboss.mx.loading.UnifiedClassLoader3;
-import org.jboss.remoting.ServerInvocationHandler;
import org.jboss.remoting.marshal.MarshalFactory;
import org.jboss.system.ServiceCreator;
import org.jboss.system.ServiceMBeanSupport;
@@ -169,10 +168,6 @@
protected ObjectName defaultExpiryQueueObjectName;
protected Queue defaultExpiryQueue;
- //Other stuff
-
- private JMSServerInvocationHandler handler;
-
// Constructors ---------------------------------------------------------------------------------
public ServerPeer(int serverPeerID,
@@ -272,6 +267,10 @@
txRepository.loadPreparedTransactions();
initializeRemoting(mbeanServer);
+
+ //Now everything is started we can tell the invocation handler to start handling invocations
+ //We do this right at the end otherwise it can start handling invocations before we are properly started
+ JMSServerInvocationHandler.setClosed(false);
started = true;
@@ -296,6 +295,11 @@
log.debug(this + " stopping");
started = false;
+
+ //Tell the invocation handler we are closed - this is so we don't attempt to handle
+ //any invocations when we are in a partial closing down state - which can give strange
+ //"object not found with id" exceptions and stuff like that
+ JMSServerInvocationHandler.setClosed(true);
// Stop the wired components
@@ -927,11 +931,6 @@
{
return channelIDManager;
}
-
- public ServerInvocationHandler getInvocationHandler()
- {
- return handler;
- }
public ServerSessionEndpoint getSession(Integer sessionID)
{
@@ -1277,8 +1276,6 @@
JMSWireFormat wf = new JMSWireFormat();
MarshalFactory.addMarshaller("jms", wf, wf);
-
- handler = new JMSServerInvocationHandler();
}
private void loadServerAOPConfig() throws Exception
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/Version.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/Version.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/Version.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -126,7 +126,10 @@
{
Properties versionInfo = new Properties();
- is = getClass().getClassLoader().getResourceAsStream(versionFile);
+ //Note we use the context classloader so this works in a scoped deployment
+
+ is = Thread.currentThread().getContextClassLoader().getResourceAsStream(versionFile);
+
versionInfo.load(is);
String s;
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -295,7 +295,8 @@
return;
}
- Class clz = Class.forName(factoryName);
+ //We don't use Class.forName() since then it won't work with scoped deployments
+ Class clz = Thread.currentThread().getContextClassLoader().loadClass(factoryName);
loadBalancingFactory = (LoadBalancingFactory)clz.newInstance();
}
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -198,18 +198,8 @@
{
try
{
-// if (!connectionEndpoint.isFailoverConnection())
-// {
- // regular consumer
- return createConsumerDelegateInternal(jmsDestination, selector,
- noLocal, subscriptionName);
- // }
-
-// // we're child of a failover connection. Favor failover channels when creating new
-// // consumers
-// return createFailoverConsumerDelegateInternal(jmsDestination, selector,
-// noLocal, subscriptionName,
-// failoverChannelID);
+ return createConsumerDelegateInternal(jmsDestination, selector,
+ noLocal, subscriptionName);
}
catch (Throwable t)
{
@@ -223,15 +213,7 @@
{
try
{
-// if (!connectionEndpoint.isFailoverConnection())
-// {
- // regular browser
- return createBrowserDelegateInternal(jmsDestination, selector);
-// }
-//
-// // we're child of a failover connection. Favor failover channels when creating new
-// // browsers
-// return createFailoverBrowserDelegateInternal(jmsDestination, selector, failoverChannelID);
+ return createBrowserDelegateInternal(jmsDestination, selector);
}
catch (Throwable t)
{
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -28,6 +28,7 @@
import javax.management.MBeanServer;
+import org.jboss.jms.util.MessagingJMSException;
import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
import org.jboss.jms.wireformat.RequestSupport;
import org.jboss.logging.Logger;
@@ -62,14 +63,22 @@
private boolean trace;
+ //We need some way the server peer can call the invocation handler to make it open/closed
+ private static boolean closed = true;
+
+ public static synchronized void setClosed(boolean closed)
+ {
+ JMSServerInvocationHandler.closed = closed;
+ }
+
// Constructors ---------------------------------------------------------------------------------
public JMSServerInvocationHandler()
{
callbackHandlers = new HashMap();
trace = log.isTraceEnabled();
- }
-
+ }
+
// ServerInvocationHandler ----------------------------------------------------------------------
public void setMBeanServer(MBeanServer server)
@@ -92,37 +101,45 @@
public Object invoke(InvocationRequest invocation) throws Throwable
{
if (trace) { log.trace("invoking " + invocation); }
-
- RequestSupport request = (RequestSupport)invocation.getParameter();
- if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
- {
- //Create connection request
-
- ConnectionFactoryCreateConnectionDelegateRequest cReq =
- (ConnectionFactoryCreateConnectionDelegateRequest)request;
-
- String remotingSessionId = cReq.getRemotingSessionID();
-
- ServerInvokerCallbackHandler callbackHandler = null;
- synchronized(callbackHandlers)
+ synchronized (JMSServerInvocationHandler.class)
+ {
+ if (closed)
{
- callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+ throw new MessagingJMSException("Cannot handle invocation since server is not active (it is either starting up or shutting down)");
}
- if (callbackHandler != null)
+
+ RequestSupport request = (RequestSupport)invocation.getParameter();
+
+ if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
{
- log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+ //Create connection request
- cReq.setCallbackHandler(callbackHandler);
+ ConnectionFactoryCreateConnectionDelegateRequest cReq =
+ (ConnectionFactoryCreateConnectionDelegateRequest)request;
+
+ String remotingSessionId = cReq.getRemotingSessionID();
+
+ ServerInvokerCallbackHandler callbackHandler = null;
+ synchronized(callbackHandlers)
+ {
+ callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+ }
+ if (callbackHandler != null)
+ {
+ log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+
+ cReq.setCallbackHandler(callbackHandler);
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot find callback handler " +
+ "for session id " + remotingSessionId);
+ }
}
- else
- {
- throw new IllegalStateException("Cannot find callback handler " +
- "for session id " + remotingSessionId);
- }
+
+ return request.serverInvoke();
}
-
- return request.serverInvoke();
}
public void addListener(InvokerCallbackHandler callbackHandler)
@@ -193,5 +210,10 @@
// Private --------------------------------------------------------------------------------------
+ private synchronized void doSetClosed(boolean closed)
+ {
+ this.closed = true;
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Copied: branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist (from rev 2571, trunk/src/main/org/jboss/messaging/core/filepersist)
Deleted: branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -1,168 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.filepersist;
-
-import java.nio.ByteBuffer;
-
-/**
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class BlockIndex implements Cloneable
-{
-
- // Constants ------------------------------------------------------------------------------------
-
- public static final int REGISTER_SIZE = 4 * 4 + 8 + 1;
-
- // Attributes -----------------------------------------------------------------------------------
-
- /** Immutable attribute */
- private int blockId;
-
- /** Immutable attribute, you can't change a block's size after allocated */
- private int blockSize;
-
- /** Immutable attribute, you can't change where a block is written after */
- private long filePosition;
-
- private boolean confirmed;
-
- private int nextBlock=-1;
-
- private int previousBlock=-1;
-
- // Static ---------------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
- BlockIndex()
- {
- }
-
- public BlockIndex(int blockId, int blockSize, long filePosition)
- {
- this.blockId = blockId;
- this.blockSize = blockSize;
- this.filePosition = filePosition;
- }
-
- public BlockIndex(int blockId, int blockSize, long filePosition,
- boolean confirmed, int nextBlock, int previousBlock)
- {
- this(blockId, blockSize, filePosition);
- this.confirmed = confirmed;
- this.nextBlock = nextBlock;
- this.previousBlock = previousBlock;
- }
-
- // Public ---------------------------------------------------------------------------------------
-
-
- public int getBlockId()
- {
- return blockId;
- }
-
- public int getBlockSize()
- {
- return blockSize;
- }
-
- public long getFilePosition()
- {
- return filePosition;
- }
-
- public int getNextBlock()
- {
- return nextBlock;
- }
-
- public void setNextBlock(int nextBlock)
- {
- this.nextBlock = nextBlock;
- }
-
-
- public boolean isConfirmed()
- {
- return confirmed;
- }
-
- public void setConfirmed(boolean confirmed)
- {
- this.confirmed = confirmed;
- }
-
-
- public int getPreviousBlock()
- {
- return previousBlock;
- }
-
- public void setPreviousBlock(int previousBlock)
- {
- this.previousBlock = previousBlock;
- }
-
- public void writeToBuffer(ByteBuffer buffer)
- {
- buffer.putInt(nextBlock);
- buffer.putInt(previousBlock);
- buffer.put(confirmed?(byte)1:(byte)0);
- buffer.putLong(filePosition);
- buffer.putInt(blockSize);
- buffer.putInt(blockId);
- }
-
- public void readFromBuffer(ByteBuffer buffer)
- {
- nextBlock = buffer.getInt();
- previousBlock = buffer.getInt();
- confirmed = (buffer.get()==(byte)1);
- filePosition = buffer.getLong();
- blockSize = buffer.getInt();
- blockId = buffer.getInt();
- }
-
- public Object clone()
- {
- return new BlockIndex(blockId, blockSize, filePosition, confirmed, nextBlock, previousBlock);
- }
-
- public String toString()
- {
- return "BlockIndex[" + this.blockId + "]";
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
-}
Copied: branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java (from rev 2571, trunk/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java)
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java (rev 0)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/BlockIndex.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -0,0 +1,168 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.filepersist;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class BlockIndex implements Cloneable
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ public static final int REGISTER_SIZE = 4 * 4 + 8 + 1;
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ /** Immutable attribute */
+ private int blockId;
+
+ /** Immutable attribute, you can't change a block's size after allocated */
+ private int blockSize;
+
+ /** Immutable attribute, you can't change where a block is written after */
+ private long filePosition;
+
+ private boolean confirmed;
+
+ private int nextBlock=-1;
+
+ private int previousBlock=-1;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ BlockIndex()
+ {
+ }
+
+ public BlockIndex(int blockId, int blockSize, long filePosition)
+ {
+ this.blockId = blockId;
+ this.blockSize = blockSize;
+ this.filePosition = filePosition;
+ }
+
+ public BlockIndex(int blockId, int blockSize, long filePosition,
+ boolean confirmed, int nextBlock, int previousBlock)
+ {
+ this(blockId, blockSize, filePosition);
+ this.confirmed = confirmed;
+ this.nextBlock = nextBlock;
+ this.previousBlock = previousBlock;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+
+ public int getBlockId()
+ {
+ return blockId;
+ }
+
+ public int getBlockSize()
+ {
+ return blockSize;
+ }
+
+ public long getFilePosition()
+ {
+ return filePosition;
+ }
+
+ public int getNextBlock()
+ {
+ return nextBlock;
+ }
+
+ public void setNextBlock(int nextBlock)
+ {
+ this.nextBlock = nextBlock;
+ }
+
+
+ public boolean isConfirmed()
+ {
+ return confirmed;
+ }
+
+ public void setConfirmed(boolean confirmed)
+ {
+ this.confirmed = confirmed;
+ }
+
+
+ public int getPreviousBlock()
+ {
+ return previousBlock;
+ }
+
+ public void setPreviousBlock(int previousBlock)
+ {
+ this.previousBlock = previousBlock;
+ }
+
+ public void writeToBuffer(ByteBuffer buffer)
+ {
+ buffer.putInt(nextBlock);
+ buffer.putInt(previousBlock);
+ buffer.put(confirmed?(byte)1:(byte)0);
+ buffer.putLong(filePosition);
+ buffer.putInt(blockSize);
+ buffer.putInt(blockId);
+ }
+
+ public void readFromBuffer(ByteBuffer buffer)
+ {
+ nextBlock = buffer.getInt();
+ previousBlock = buffer.getInt();
+ confirmed = (buffer.get()==(byte)1);
+ filePosition = buffer.getLong();
+ blockSize = buffer.getInt();
+ blockId = buffer.getInt();
+ }
+
+ public Object clone()
+ {
+ return new BlockIndex(blockId, blockSize, filePosition, confirmed, nextBlock, previousBlock);
+ }
+
+ public String toString()
+ {
+ return "BlockIndex[" + this.blockId + "]";
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Deleted: branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/DataFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/DataFile.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -1,270 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.filepersist;
-
-import java.io.RandomAccessFile;
-import java.io.IOException;
-import java.io.File;
-import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-/**
- * This class can play with BlockIndex
- * A BlockIndex could be added, and later be confirmed.
- * When a block is confirmed its status is set to confirmed, and the list is then updated
- *
- * (This is just an experiment.. there is a lot of work to do here..
- * For example, we need to support multiple files...
- * and multiple messages in a single block)
- *
- * I think we should support writing messages in blocks, and confirm a single block.
- *
- * At this point confirming a block is a slow operation, but we can improve this... maybe avoiding
- * double linkes lists what forces two updates on each insert
- * (what is easy to support at this point).
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class DataFile
-{
-
- // Constants ------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- // Lock used for while the index is being updated. You can't have multiple handles
- Object lockIndex = new Object();
-
- // We reuse the same instance of ByteBuffer used on indexes since the access is synchronized
- ByteBuffer indexBuffer = ByteBuffer.allocate(BlockIndex.REGISTER_SIZE);
-
- int numberOfBlocks;
-
- FileChannel indexChannel;
- FileChannel dataChannel;
-
- RandomAccessFile index;
- RandomAccessFile data;
-
- BlockIndex root;
- BlockIndex last;
-
- // Static ---------------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
- public DataFile(File indexFile, File dataFile) throws IOException
- {
- if (indexFile.exists() && dataFile.exists())
- {
- recover(indexFile, dataFile);
- }
- else
- {
- init(indexFile, dataFile);
- }
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public BlockIndex addBlock(ByteBuffer dataBuffer) throws IOException
- {
- return addBlock(dataBuffer, null);
- }
-
- /**
- * Use precedent only when adding multiple blocks as part of the same confirmation.
- * Say if you are adding 10 messages, the first message will have no precedence until you
- * confirm it. You don't need to confirm subsequent messages.
- * */
- public BlockIndex addBlock(ByteBuffer dataBuffer, BlockIndex precedent) throws IOException
- {
- BlockIndex block = null;
-
- synchronized (lockIndex)
- {
- block = new BlockIndex(numberOfBlocks++, dataBuffer.capacity(), dataChannel.size());
- if (precedent!=null)
- {
- precedent.setNextBlock(block.getBlockId());
- block.setPreviousBlock(precedent.getBlockId());
- block.setConfirmed(true);
- updateIndex(precedent);
- }
- updateIndex(block);
- }
-
- dataBuffer.rewind();
- dataChannel.write(dataBuffer, block.getFilePosition());
- dataChannel.force(false);
-
- return block;
- }
-
- public void confirmBlock(BlockIndex block) throws IOException
- {
- if (block.isConfirmed())
- {
- throw new IOException("Block already confirmed!");
- }
-
- synchronized (lockIndex)
- {
- last.setNextBlock(block.getBlockId());
- BlockIndex currentBlock = null;
- for (Iterator iter = new IteratorImpl(last);iter.hasNext();)
- {
- currentBlock = (BlockIndex)iter.next();
- }
- if (currentBlock==null)
- {
- throw new IOException("Couldn't find last element on index");
- }
- updateIndex(last);
- last = (BlockIndex)currentBlock.clone();
- }
- }
-
- public BlockIndex readBlock (int blockId) throws IOException
- {
- return readBlock(new BlockIndex(), blockId);
- }
-
- public BlockIndex readBlock(BlockIndex blockToRead, int blockId) throws IOException
- {
- synchronized (lockIndex)
- {
- indexBuffer.rewind();
- indexChannel.read(indexBuffer,getPosition(blockId));
- indexBuffer.rewind();
- blockToRead.readFromBuffer(indexBuffer);
- return blockToRead;
- }
- }
-
- public void close() throws IOException
- {
- index.close();
- data.close();
- }
-
- public long getPosition(int blockId)
- {
- return (blockId+1) * BlockIndex.REGISTER_SIZE;
- }
-
- public Iterator iterateIndexes()
- {
- return new IteratorImpl();
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- protected void recover(File indexFile, File dataFile) throws IOException
- {
- index = new RandomAccessFile(indexFile,"rw");
- data = new RandomAccessFile(dataFile,"rw");
- indexChannel = index.getChannel();
- dataChannel = data.getChannel();
-
- root = readBlock(-1);
- }
-
- protected void init(File indexFile, File dataFile) throws IOException
- {
- index = new RandomAccessFile(indexFile,"rw");
- data = new RandomAccessFile(dataFile,"rw");
- indexChannel = index.getChannel();
- dataChannel = data.getChannel();
- root = new BlockIndex(-1,-1,-1l);
- last = root;
- updateIndex(root);
- }
-
- // Private --------------------------------------------------------------------------------------
-
- /** This method should be called within a synchronized(lockIndex)*/
- private void updateIndex(BlockIndex block) throws IOException
- {
- indexBuffer.rewind();
-
- block.writeToBuffer(indexBuffer);
-
- indexBuffer.rewind();
-
- indexChannel.write(indexBuffer, getPosition(block.getBlockId()));
-
- indexChannel.force(false);
- }
-
- // Inner classes --------------------------------------------------------------------------------
-
- class IteratorImpl implements Iterator
- {
-
- BlockIndex currentBlock;
-
- IteratorImpl()
- {
- currentBlock = (BlockIndex) DataFile.this.root.clone();
- }
-
- IteratorImpl(BlockIndex startAt)
- {
- currentBlock = startAt;
- }
-
- public boolean hasNext()
- {
- return currentBlock.getNextBlock()>=0;
- }
-
- public Object next()
- {
- try
- {
- if (!hasNext())
- {
- throw new IllegalStateException("Already reached end of blocks");
- }
- currentBlock = DataFile.this.readBlock(currentBlock.getNextBlock());
- return currentBlock;
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void remove()
- {
- throw new RuntimeException("Not supported!");
- }
- }
-
-}
Copied: branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/DataFile.java (from rev 2571, trunk/src/main/org/jboss/messaging/core/filepersist/DataFile.java)
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/DataFile.java (rev 0)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/filepersist/DataFile.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -0,0 +1,270 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.filepersist;
+
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.io.File;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * This class can play with BlockIndex
+ * A BlockIndex could be added, and later be confirmed.
+ * When a block is confirmed its status is set to confirmed, and the list is then updated
+ *
+ * (This is just an experiment.. there is a lot of work to do here..
+ * For example, we need to support multiple files...
+ * and multiple messages in a single block)
+ *
+ * I think we should support writing messages in blocks, and confirm a single block.
+ *
+ * At this point confirming a block is a slow operation, but we can improve this... maybe avoiding
+ * double linkes lists what forces two updates on each insert
+ * (what is easy to support at this point).
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class DataFile
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Lock used for while the index is being updated. You can't have multiple handles
+ Object lockIndex = new Object();
+
+ // We reuse the same instance of ByteBuffer used on indexes since the access is synchronized
+ ByteBuffer indexBuffer = ByteBuffer.allocate(BlockIndex.REGISTER_SIZE);
+
+ int numberOfBlocks;
+
+ FileChannel indexChannel;
+ FileChannel dataChannel;
+
+ RandomAccessFile index;
+ RandomAccessFile data;
+
+ BlockIndex root;
+ BlockIndex last;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public DataFile(File indexFile, File dataFile) throws IOException
+ {
+ if (indexFile.exists() && dataFile.exists())
+ {
+ recover(indexFile, dataFile);
+ }
+ else
+ {
+ init(indexFile, dataFile);
+ }
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public BlockIndex addBlock(ByteBuffer dataBuffer) throws IOException
+ {
+ return addBlock(dataBuffer, null);
+ }
+
+ /**
+ * Use precedent only when adding multiple blocks as part of the same confirmation.
+ * Say if you are adding 10 messages, the first message will have no precedence until you
+ * confirm it. You don't need to confirm subsequent messages.
+ * */
+ public BlockIndex addBlock(ByteBuffer dataBuffer, BlockIndex precedent) throws IOException
+ {
+ BlockIndex block = null;
+
+ synchronized (lockIndex)
+ {
+ block = new BlockIndex(numberOfBlocks++, dataBuffer.capacity(), dataChannel.size());
+ if (precedent!=null)
+ {
+ precedent.setNextBlock(block.getBlockId());
+ block.setPreviousBlock(precedent.getBlockId());
+ block.setConfirmed(true);
+ updateIndex(precedent);
+ }
+ updateIndex(block);
+ }
+
+ dataBuffer.rewind();
+ dataChannel.write(dataBuffer, block.getFilePosition());
+ dataChannel.force(false);
+
+ return block;
+ }
+
+ public void confirmBlock(BlockIndex block) throws IOException
+ {
+ if (block.isConfirmed())
+ {
+ throw new IOException("Block already confirmed!");
+ }
+
+ synchronized (lockIndex)
+ {
+ last.setNextBlock(block.getBlockId());
+ BlockIndex currentBlock = null;
+ for (Iterator iter = new IteratorImpl(last);iter.hasNext();)
+ {
+ currentBlock = (BlockIndex)iter.next();
+ }
+ if (currentBlock==null)
+ {
+ throw new IOException("Couldn't find last element on index");
+ }
+ updateIndex(last);
+ last = (BlockIndex)currentBlock.clone();
+ }
+ }
+
+ public BlockIndex readBlock (int blockId) throws IOException
+ {
+ return readBlock(new BlockIndex(), blockId);
+ }
+
+ public BlockIndex readBlock(BlockIndex blockToRead, int blockId) throws IOException
+ {
+ synchronized (lockIndex)
+ {
+ indexBuffer.rewind();
+ indexChannel.read(indexBuffer,getPosition(blockId));
+ indexBuffer.rewind();
+ blockToRead.readFromBuffer(indexBuffer);
+ return blockToRead;
+ }
+ }
+
+ public void close() throws IOException
+ {
+ index.close();
+ data.close();
+ }
+
+ public long getPosition(int blockId)
+ {
+ return (blockId+1) * BlockIndex.REGISTER_SIZE;
+ }
+
+ public Iterator iterateIndexes()
+ {
+ return new IteratorImpl();
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void recover(File indexFile, File dataFile) throws IOException
+ {
+ index = new RandomAccessFile(indexFile,"rw");
+ data = new RandomAccessFile(dataFile,"rw");
+ indexChannel = index.getChannel();
+ dataChannel = data.getChannel();
+
+ root = readBlock(-1);
+ }
+
+ protected void init(File indexFile, File dataFile) throws IOException
+ {
+ index = new RandomAccessFile(indexFile,"rw");
+ data = new RandomAccessFile(dataFile,"rw");
+ indexChannel = index.getChannel();
+ dataChannel = data.getChannel();
+ root = new BlockIndex(-1,-1,-1l);
+ last = root;
+ updateIndex(root);
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ /** This method should be called within a synchronized(lockIndex)*/
+ private void updateIndex(BlockIndex block) throws IOException
+ {
+ indexBuffer.rewind();
+
+ block.writeToBuffer(indexBuffer);
+
+ indexBuffer.rewind();
+
+ indexChannel.write(indexBuffer, getPosition(block.getBlockId()));
+
+ indexChannel.force(false);
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ class IteratorImpl implements Iterator
+ {
+
+ BlockIndex currentBlock;
+
+ IteratorImpl()
+ {
+ currentBlock = (BlockIndex) DataFile.this.root.clone();
+ }
+
+ IteratorImpl(BlockIndex startAt)
+ {
+ currentBlock = startAt;
+ }
+
+ public boolean hasNext()
+ {
+ return currentBlock.getNextBlock()>=0;
+ }
+
+ public Object next()
+ {
+ try
+ {
+ if (!hasNext())
+ {
+ throw new IllegalStateException("Already reached end of blocks");
+ }
+ currentBlock = DataFile.this.readBlock(currentBlock.getNextBlock());
+ return currentBlock;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new RuntimeException("Not supported!");
+ }
+ }
+
+}
Modified: branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -384,10 +384,13 @@
PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
int nodeId = serverPeer.getServerPeerID();
- Class clazz = Class.forName(messagePullPolicy);
+ // We don't use Class.forName() since then it won't work with scoped deployments
+ Class clazz = Thread.currentThread().getContextClassLoader().loadClass(messagePullPolicy);
+
MessagePullPolicy pullPolicy = (MessagePullPolicy)clazz.newInstance();
- clazz = Class.forName(clusterRouterFactory);
+ clazz = Thread.currentThread().getContextClassLoader().loadClass(clusterRouterFactory);
+
ClusterRouterFactory rf = (ClusterRouterFactory)clazz.newInstance();
ConditionFactory cf = new JMSConditionFactory();
Copied: branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist (from rev 2571, trunk/tests/src/org/jboss/test/messaging/core/filepersist)
Deleted: branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -1,278 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.test.messaging.core.filepersist;
-
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.messaging.core.filepersist.DataFile;
-import org.jboss.messaging.core.filepersist.BlockIndex;
-import java.io.RandomAccessFile;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.ArrayList;
-import java.util.HashSet;
-
-/**
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class DataFileTest extends MessagingTestCase
-{
-
- // Constants ------------------------------------------------------------------------------------
-
- File fileIndex = new File("/tmp/file-index.bin");
- File fileData = new File("/tmp/file-data.bin");
-
- // Attributes -----------------------------------------------------------------------------------
-
- // Static ---------------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
- public DataFileTest(String name)
- {
- super(name);
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void testAdd() throws Exception
- {
- DataFile transactioned = new DataFile(fileIndex, fileData);
-
- assertEquals(BlockIndex.REGISTER_SIZE, transactioned.getPosition(0));
-
- ByteBuffer buffer = ByteBuffer.allocateDirect(4 * 5);
- for (int i = 0; i < 5; i++)
- {
- buffer.putInt(i);
- }
- buffer.mark();
-
-
- BlockIndex firstBlock = transactioned.addBlock(buffer);
- BlockIndex currentBlock = firstBlock;
- for (int i = 0; i < 1000; i++)
- {
- currentBlock = transactioned.addBlock(buffer, currentBlock);
- }
-
- Iterator iter = transactioned.iterateIndexes();
- assertFalse(iter.hasNext());
-
- transactioned.confirmBlock(firstBlock);
-
- int elements = 0;
-
- for (iter = transactioned.iterateIndexes(); iter.hasNext();)
- {
- iter.next();
- elements++;
- }
-
- assertEquals(1001, elements);
-
- currentBlock = transactioned.addBlock(buffer);
-
- elements = 0;
-
- for (iter = transactioned.iterateIndexes(); iter.hasNext();)
- {
- iter.next();
- elements++;
- }
-
- assertEquals(1001, elements);
-
- transactioned.confirmBlock(currentBlock);
-
- elements = 0;
-
- for (iter = transactioned.iterateIndexes(); iter.hasNext();)
- {
- iter.next();
- elements++;
- }
-
- assertEquals(1002, elements);
-
- transactioned.close();
- }
-
- public void testAddMultiThread() throws Exception
- {
- final DataFile transactioned = new DataFile(fileIndex, fileData);
- final Object semaphore = new Object();
- final ArrayList failures = new ArrayList();
-
- Thread threads[] = new Thread[20];
-
- for (int i=0;i<threads.length;i++)
- {
- threads[i] = new Thread()
- {
- public void run()
- {
- try
- {
- ByteBuffer buffer = ByteBuffer.allocate(100);
- for (byte i=0;i<100;i++)
- {
- buffer.put(i);
- }
- synchronized (semaphore)
- {
- semaphore.wait();
- }
-
- BlockIndex first = transactioned.addBlock(buffer);
- BlockIndex current = first;
-
- for (byte i=0;i<99;i++)
- {
- current = transactioned.addBlock(buffer, current);
- }
-
- transactioned.confirmBlock(first);
-
- }
- catch (Exception e)
- {
- log.error(e);
- failures.add(e);
- }
- }
- };
-
- }
-
- for (int counter=0;counter<threads.length;counter++)
- {
- threads[counter].start();
- }
-
-
- Thread.sleep(2000);
-
- synchronized (semaphore)
- {
- semaphore.notifyAll();
- }
-
- for (int counter=0;counter<threads.length;counter++)
- {
- threads[counter].join();
- }
-
- if (failures.size()>0)
- {
- throw (Exception) failures.get(0);
- }
-
- int elements = 0;
-
- HashSet set = new HashSet();
-
- for (Iterator iter = transactioned.iterateIndexes(); iter.hasNext();)
- {
- BlockIndex index = (BlockIndex)iter.next();
- set.add(new Integer(index.getBlockId()));
- elements++;
- }
-
- assertEquals(threads.length * 100, elements);
-
- for (int counter = 0; counter < threads.length * 100; counter++)
- {
- Integer intKey = new Integer(counter);
- assertTrue("Could not find intKey=" + intKey, set.contains(intKey));
- }
-
-
- }
-
- /*public void testDeleteme() throws Exception
- {
- File fileIndex = new File("/tmp/file-tmp.bin");
- deleteFile(fileIndex);
- RandomAccessFile file1 = new RandomAccessFile(fileIndex,"rw");
-
- ByteBuffer buffer = ByteBuffer.allocate(100*4);
- for (int i=0;i<100;i++)
- {
- buffer.putInt(i);
- }
-
- System.out.println("pos(1) = " + buffer.position() + " lim = " + buffer.limit());
- buffer.rewind();
- System.out.println("pos(2) = " + buffer.position() + " lim = " + buffer.limit());
-
- for (int i=0;i<100;i++)
- {
- buffer.putInt(i);
- }
-
- buffer.rewind();
- System.out.println("pos(3) = " + buffer.position() + " lim = " + buffer.limit());
-
-
- FileChannel channel = file1.getChannel();
-
- channel.write(buffer);
-
- System.out.println("Size on file1=" + file1.length());
-
- file1.close();
-
-
- } */
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
- deleteFile(fileIndex);
- deleteFile(fileData);
- }
-
- protected void deleteFile(File file)
- {
- try
- {
- file.delete();
- }
- catch (Exception e)
- {
-
- }
- }
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
-}
Copied: branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java (from rev 2571, trunk/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java)
===================================================================
--- branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java (rev 0)
+++ branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/core/filepersist/DataFileTest.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -0,0 +1,278 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.core.filepersist;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.messaging.core.filepersist.DataFile;
+import org.jboss.messaging.core.filepersist.BlockIndex;
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class DataFileTest extends MessagingTestCase
+{
+
+ // Constants ------------------------------------------------------------------------------------
+
+ File fileIndex = new File("/tmp/file-index.bin");
+ File fileData = new File("/tmp/file-data.bin");
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public DataFileTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ public void testAdd() throws Exception
+ {
+ DataFile transactioned = new DataFile(fileIndex, fileData);
+
+ assertEquals(BlockIndex.REGISTER_SIZE, transactioned.getPosition(0));
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(4 * 5);
+ for (int i = 0; i < 5; i++)
+ {
+ buffer.putInt(i);
+ }
+ buffer.mark();
+
+
+ BlockIndex firstBlock = transactioned.addBlock(buffer);
+ BlockIndex currentBlock = firstBlock;
+ for (int i = 0; i < 1000; i++)
+ {
+ currentBlock = transactioned.addBlock(buffer, currentBlock);
+ }
+
+ Iterator iter = transactioned.iterateIndexes();
+ assertFalse(iter.hasNext());
+
+ transactioned.confirmBlock(firstBlock);
+
+ int elements = 0;
+
+ for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ iter.next();
+ elements++;
+ }
+
+ assertEquals(1001, elements);
+
+ currentBlock = transactioned.addBlock(buffer);
+
+ elements = 0;
+
+ for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ iter.next();
+ elements++;
+ }
+
+ assertEquals(1001, elements);
+
+ transactioned.confirmBlock(currentBlock);
+
+ elements = 0;
+
+ for (iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ iter.next();
+ elements++;
+ }
+
+ assertEquals(1002, elements);
+
+ transactioned.close();
+ }
+
+ public void testAddMultiThread() throws Exception
+ {
+ final DataFile transactioned = new DataFile(fileIndex, fileData);
+ final Object semaphore = new Object();
+ final ArrayList failures = new ArrayList();
+
+ Thread threads[] = new Thread[20];
+
+ for (int i=0;i<threads.length;i++)
+ {
+ threads[i] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(100);
+ for (byte i=0;i<100;i++)
+ {
+ buffer.put(i);
+ }
+ synchronized (semaphore)
+ {
+ semaphore.wait();
+ }
+
+ BlockIndex first = transactioned.addBlock(buffer);
+ BlockIndex current = first;
+
+ for (byte i=0;i<99;i++)
+ {
+ current = transactioned.addBlock(buffer, current);
+ }
+
+ transactioned.confirmBlock(first);
+
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ failures.add(e);
+ }
+ }
+ };
+
+ }
+
+ for (int counter=0;counter<threads.length;counter++)
+ {
+ threads[counter].start();
+ }
+
+
+ Thread.sleep(2000);
+
+ synchronized (semaphore)
+ {
+ semaphore.notifyAll();
+ }
+
+ for (int counter=0;counter<threads.length;counter++)
+ {
+ threads[counter].join();
+ }
+
+ if (failures.size()>0)
+ {
+ throw (Exception) failures.get(0);
+ }
+
+ int elements = 0;
+
+ HashSet set = new HashSet();
+
+ for (Iterator iter = transactioned.iterateIndexes(); iter.hasNext();)
+ {
+ BlockIndex index = (BlockIndex)iter.next();
+ set.add(new Integer(index.getBlockId()));
+ elements++;
+ }
+
+ assertEquals(threads.length * 100, elements);
+
+ for (int counter = 0; counter < threads.length * 100; counter++)
+ {
+ Integer intKey = new Integer(counter);
+ assertTrue("Could not find intKey=" + intKey, set.contains(intKey));
+ }
+
+
+ }
+
+ /*public void testDeleteme() throws Exception
+ {
+ File fileIndex = new File("/tmp/file-tmp.bin");
+ deleteFile(fileIndex);
+ RandomAccessFile file1 = new RandomAccessFile(fileIndex,"rw");
+
+ ByteBuffer buffer = ByteBuffer.allocate(100*4);
+ for (int i=0;i<100;i++)
+ {
+ buffer.putInt(i);
+ }
+
+ System.out.println("pos(1) = " + buffer.position() + " lim = " + buffer.limit());
+ buffer.rewind();
+ System.out.println("pos(2) = " + buffer.position() + " lim = " + buffer.limit());
+
+ for (int i=0;i<100;i++)
+ {
+ buffer.putInt(i);
+ }
+
+ buffer.rewind();
+ System.out.println("pos(3) = " + buffer.position() + " lim = " + buffer.limit());
+
+
+ FileChannel channel = file1.getChannel();
+
+ channel.write(buffer);
+
+ System.out.println("Size on file1=" + file1.length());
+
+ file1.close();
+
+
+ } */
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ deleteFile(fileIndex);
+ deleteFile(fileData);
+ }
+
+ protected void deleteFile(File file)
+ {
+ try
+ {
+ file.delete();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
+}
Modified: branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/jms/QueueTest.java
===================================================================
--- branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/jms/QueueTest.java 2007-04-11 16:38:25 UTC (rev 2571)
+++ branches/Branch_1_2_0_SP/tests/src/org/jboss/test/messaging/jms/QueueTest.java 2007-04-11 16:59:31 UTC (rev 2572)
@@ -124,21 +124,20 @@
// added for http://jira.jboss.org/jira/browse/JBMESSAGING-899
public void testClosedConsumerAfterStart() throws Exception
{
- Queue queue = (Queue)ic.lookup("/queue/TestQueue");
+ Queue queue = (Queue) ic.lookup("/queue/TestQueue");
- // Maybe we could remove this counter after we are sure this test is fixed!
- // I had to use a counter because this can work in some iterations.
+ // This loop is to increase chances of a failure.
for (int counter = 0; counter < 20; counter++)
{
log.info("Iteration = " + counter);
Connection conn1 = cf.createConnection();
- assertEquals(0, ((JBossConnection)conn1).getServerID());
+ assertEquals(0, ((JBossConnection) conn1).getServerID());
Connection conn2 = cf.createConnection();
- assertEquals(0, ((JBossConnection)conn2).getServerID());
+ assertEquals(0, ((JBossConnection) conn2).getServerID());
try
{
@@ -155,7 +154,8 @@
Session s2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
- // these next three lines are an anti-pattern but they shouldn't loose any messages
+ // Create a consumer, start the session, close the consumer..
+ // This shouldn't cause any message to be lost
MessageConsumer c2 = s2.createConsumer(queue);
conn2.start();
c2.close();
@@ -166,29 +166,18 @@
//consumer and are cancelled back before delivery to the other consumer has finished.
//There is nothing much we can do about this
Set texts = new HashSet();
-
+
for (int i = 0; i < 20; i++)
{
- TextMessage txt = (TextMessage)c2.receive(5000);
+ TextMessage txt = (TextMessage) c2.receive(5000);
assertNotNull(txt);
- texts.add(txt.getText());
+ texts.add(txt.getText());
}
-
+
for (int i = 0; i < 20; i++)
{
- assertTrue(texts.contains("message " + i));
+ assertTrue(texts.contains("message " + i));
}
-
- // Ovidiu: the test was originally invalid, a locally transacted session that is closed
- // rolls back its transaction. I added s2.commit() to correct the test.
- // JMS 1.1 Specifications, Section 4.3.5:
- // "Closing a connection must roll back the transactions in progress on its
- // transacted sessions*.
- // *) The term 'transacted session' refers to the case where a session's commit and
- // rollback methods are used to demarcate a transaction local to the session. In the
- // case where a session's work is coordinated by an external transaction manager, a
- // session's commit and rollback methods are not used and the result of a closed
- // session's work is determined later by the transaction manager.
s2.commit();
More information about the jboss-cvs-commits
mailing list