JBoss hornetq SVN: r8032 - trunk.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-02 08:36:05 -0400 (Fri, 02 Oct 2009)
New Revision: 8032
Modified:
trunk/.classpath
Log:
HORNETQ-159 - changed jms-ra.rar to be called hornetq-ra.rar and added dependencies to support ResourceAdapater tag
* added jboss-ejb3-ext-api.jar to eclipse's .classpath
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-10-02 12:06:29 UTC (rev 8031)
+++ trunk/.classpath 2009-10-02 12:36:05 UTC (rev 8032)
@@ -118,5 +118,6 @@
<classpathentry kind="lib" path="thirdparty/org/jboss/microcontainer/lib/jboss-kernel.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jboss-security-spi.jar"/>
<classpathentry kind="lib" path="thirdparty/wutka-dtdparser/lib/dtdparser121.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/jboss/ejb3/lib/jboss-ejb3-ext-api.jar"/>
<classpathentry kind="output" path="eclipse-output"/>
</classpath>
14 years, 7 months
JBoss hornetq SVN: r8031 - in trunk: docs/user-manual/en and 11 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-10-02 08:06:29 -0400 (Fri, 02 Oct 2009)
New Revision: 8031
Modified:
trunk/docs/user-manual/en/appserver-integration.xml
trunk/examples/javaee/jca-config/server2/jms-remote-ds.xml
trunk/examples/javaee/mdb-bmt/src/org/hornetq/javaee/example/server/MDB_BMTExample.java
trunk/examples/javaee/mdb-cmt-setrollbackonly/src/org/hornetq/javaee/example/server/MDB_CMT_SetRollbackOnlyExample.java
trunk/examples/javaee/mdb-cmt-tx-local/src/org/hornetq/javaee/example/server/MDB_CMT_TxLocalExample.java
trunk/examples/javaee/mdb-cmt-tx-not-supported/src/org/hornetq/javaee/example/server/MDB_CMT_TxNotSupported.java
trunk/examples/javaee/mdb-cmt-tx-required/src/org/hornetq/javaee/example/server/MDB_CMT_TxRequiredExample.java
trunk/examples/javaee/mdb-message-selector/src/org/hornetq/javaee/example/server/MDBMessageSelectorExample.java
trunk/examples/javaee/mdb-tx-send/src/org/hornetq/javaee/example/server/MDBMessageSendTxExample.java
trunk/hornetq.ipr
trunk/pom.xml
trunk/src/config/jboss-as/build.xml
trunk/src/config/jboss-as/clustered/jms-ds.xml
trunk/src/config/jboss-as/non-clustered/jms-ds.xml
Log:
https://jira.jboss.org/jira/browse/HORNETQ-159 - changed jms-ra.rar to be called hornetq-ra.rar and added dependencies to support ResourceAdapater tag.
Modified: trunk/docs/user-manual/en/appserver-integration.xml
===================================================================
--- trunk/docs/user-manual/en/appserver-integration.xml 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/docs/user-manual/en/appserver-integration.xml 2009-10-02 12:06:29 UTC (rev 8031)
@@ -43,6 +43,7 @@
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
})
+@ResourceAdapter("hornetq-ra.rar")
public class MDBExample implements MessageListener
{
public void onMessage(Message message)...
@@ -50,6 +51,22 @@
<para>In this example you can see that the MDB will consume messages from a queue that is
mapped into JNDI with the binding <literal>queue/testQueue</literal>. This queue must be
preconfigured in the usual way using the HornetQ configuration files.</para>
+ <para>The <literal>ResourceAdapter</literal> annotation is used to specify which ra adapter
+ should be used. To use this you will need to import <literal
+ >org.jboss.ejb3.annotation.ResourceAdapter</literal> which can be found in the
+ <literal>jboss-ejb3-ext-api.jar</literal> which can be found in the jboss
+ repository. Alternatively you can add use a deployment descriptor and add something like
+ the following to <literal
+ >jboss.xml</literal><programlisting><message-driven>
+ <ejb-name>ExampleMDB</ejb-name>
+ <resource-adapter-name>quartz-ra.rar</resource-adapter-name>
+</message-driven>
+</programlisting>You
+ can also rename the hornetq-ra.rar directory to jms-ra.rar and neither the annotation or
+ the extra descriptor information will be needed. If you do this you will need to edit
+ the <literal>jms-ds.xml</literal> datasource file and change <literal>rar-name</literal>
+ element.</para>
+ <para>All the examples shipped with the HornetQ distribution use the annotation.</para>
<section>
<title>Using Container Managed Transactions</title>
<para>When an MDB is using Container Managed Transactions (CMT), the delivery of the
@@ -66,6 +83,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMP_TxRequiredExample implements MessageListener
{
public void onMessage(Message message)...
@@ -108,6 +126,7 @@
})
@TransactionManagement(value = TransactionManagementType.CONTAINER)
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMP_TxLocalExample implements MessageListener
{
public void onMessage(Message message)...
@@ -126,6 +145,7 @@
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Dups-ok-acknowledge")
})
@TransactionManagement(value= TransactionManagementType.BEAN)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_BMPExample implements MessageListener
{
public void onMessage(Message message)
@@ -177,6 +197,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDBMessageSelectorExample implements MessageListener
{
public void onMessage(Message message)....
@@ -201,6 +222,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDBMessageSendTxExample implements MessageListener
{
@Resource(mappedName = "java:JmsXA")
@@ -637,15 +659,14 @@
as either the source or target server. Or could be on a third, separate JBoss AS
instance.</para>
<para>The bridge can also be used to bridge messages from other non HornetQ JMS servers, as
- long as they are JMS 1.1 compliant.<note>
- <para>Don't confuse a JMS bridge with a core bridge. A JMS bridge can be used to
- bridge any two JMS 1.1 compliant JMS providers and uses the JMS API. A core
- bridge (described in <xref linkend="core-bridges"/>) is used to bridge any two
- HornetQ instances and uses the core API. Always use a core bridge if you can in
- preference to a JMS bridge. The core bridge will typically provide better
- performance than a JMS bridge. Also the core bridge can provide <emphasis>once
- and only once</emphasis> delivery guarantees without using XA.</para>
- </note></para>
+ long as they are JMS 1.1 compliant.<note><para>Don't confuse a JMS bridge with a core
+ bridge. A JMS bridge can be used to bridge any two JMS 1.1 compliant JMS
+ providers and uses the JMS API. A core bridge (described in <xref
+ linkend="core-bridges"/>) is used to bridge any two HornetQ instances and
+ uses the core API. Always use a core bridge if you can in preference to a JMS
+ bridge. The core bridge will typically provide better performance than a JMS
+ bridge. Also the core bridge can provide <emphasis>once and only once</emphasis>
+ delivery guarantees without using XA.</para></note></para>
<para>The bridge has built-in resilience to failure so if the source or target server
connection is lost, e.g. due to network failure, the bridge will retry connecting to the
source and/or target until they come back online. When it comes back online it will
Modified: trunk/examples/javaee/jca-config/server2/jms-remote-ds.xml
===================================================================
--- trunk/examples/javaee/jca-config/server2/jms-remote-ds.xml 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/jca-config/server2/jms-remote-ds.xml 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,7 +12,7 @@
<tx-connection-factory>
<jndi-name>RemoteJmsXA</jndi-name>
<xa-transaction/>
- <rar-name>jms-ra.rar</rar-name>
+ <rar-name>hornetq-ra.rar</rar-name>
<connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
<config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
<config-property name="ConnectorClassName" type="java.lang.String">org.hornetq.integration.transports.netty.NettyConnectorFactory</config-property>
Modified: trunk/examples/javaee/mdb-bmt/src/org/hornetq/javaee/example/server/MDB_BMTExample.java
===================================================================
--- trunk/examples/javaee/mdb-bmt/src/org/hornetq/javaee/example/server/MDB_BMTExample.java 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/mdb-bmt/src/org/hornetq/javaee/example/server/MDB_BMTExample.java 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
@@ -34,6 +36,7 @@
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Dups-ok-acknowledge")
})
@TransactionManagement(value= TransactionManagementType.BEAN)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_BMTExample implements MessageListener
{
@Resource
Modified: trunk/examples/javaee/mdb-cmt-setrollbackonly/src/org/hornetq/javaee/example/server/MDB_CMT_SetRollbackOnlyExample.java
===================================================================
--- trunk/examples/javaee/mdb-cmt-setrollbackonly/src/org/hornetq/javaee/example/server/MDB_CMT_SetRollbackOnlyExample.java 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/mdb-cmt-setrollbackonly/src/org/hornetq/javaee/example/server/MDB_CMT_SetRollbackOnlyExample.java 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
@@ -36,6 +38,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMT_SetRollbackOnlyExample implements MessageListener
{
@Resource
Modified: trunk/examples/javaee/mdb-cmt-tx-local/src/org/hornetq/javaee/example/server/MDB_CMT_TxLocalExample.java
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-local/src/org/hornetq/javaee/example/server/MDB_CMT_TxLocalExample.java 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/mdb-cmt-tx-local/src/org/hornetq/javaee/example/server/MDB_CMT_TxLocalExample.java 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
@@ -39,6 +41,7 @@
})
@TransactionManagement(value = TransactionManagementType.CONTAINER)
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMT_TxLocalExample implements MessageListener
{
@Resource(mappedName = "java:/TransactionManager")
Modified: trunk/examples/javaee/mdb-cmt-tx-not-supported/src/org/hornetq/javaee/example/server/MDB_CMT_TxNotSupported.java
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-not-supported/src/org/hornetq/javaee/example/server/MDB_CMT_TxNotSupported.java 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/mdb-cmt-tx-not-supported/src/org/hornetq/javaee/example/server/MDB_CMT_TxNotSupported.java 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
@@ -37,6 +39,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.NOT_SUPPORTED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMT_TxNotSupported implements MessageListener
{
@Resource(mappedName = "java:/TransactionManager")
Modified: trunk/examples/javaee/mdb-cmt-tx-required/src/org/hornetq/javaee/example/server/MDB_CMT_TxRequiredExample.java
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-required/src/org/hornetq/javaee/example/server/MDB_CMT_TxRequiredExample.java 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/mdb-cmt-tx-required/src/org/hornetq/javaee/example/server/MDB_CMT_TxRequiredExample.java 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
@@ -38,6 +40,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMT_TxRequiredExample implements MessageListener
{
@Resource(mappedName = "java:/TransactionManager")
Modified: trunk/examples/javaee/mdb-message-selector/src/org/hornetq/javaee/example/server/MDBMessageSelectorExample.java
===================================================================
--- trunk/examples/javaee/mdb-message-selector/src/org/hornetq/javaee/example/server/MDBMessageSelectorExample.java 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/mdb-message-selector/src/org/hornetq/javaee/example/server/MDBMessageSelectorExample.java 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.TransactionAttribute;
@@ -34,6 +36,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDBMessageSelectorExample implements MessageListener
{
public void onMessage(Message message)
Modified: trunk/examples/javaee/mdb-tx-send/src/org/hornetq/javaee/example/server/MDBMessageSendTxExample.java
===================================================================
--- trunk/examples/javaee/mdb-tx-send/src/org/hornetq/javaee/example/server/MDBMessageSendTxExample.java 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/examples/javaee/mdb-tx-send/src/org/hornetq/javaee/example/server/MDBMessageSendTxExample.java 2009-10-02 12:06:29 UTC (rev 8031)
@@ -12,6 +12,8 @@
*/
package org.hornetq.javaee.example.server;
+import org.jboss.ejb3.annotation.ResourceAdapter;
+
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
@@ -40,6 +42,7 @@
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
+@ResourceAdapter("hornetq-ra.rar")
public class MDBMessageSendTxExample implements MessageListener
{
@Resource(mappedName = "java:JmsXA")
Modified: trunk/hornetq.ipr
===================================================================
--- trunk/hornetq.ipr 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/hornetq.ipr 2009-10-02 12:06:29 UTC (rev 8031)
@@ -693,6 +693,7 @@
<root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-jaspi-api.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/jboss/jbossts/lib/jbossts-common.jar!/" />
<root url="jar://$PROJECT_DIR$/thirdparty/apache-logging/lib/commons-logging.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/ejb3/lib/jboss-ejb3-ext-api.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/pom.xml 2009-10-02 12:06:29 UTC (rev 8031)
@@ -264,6 +264,13 @@
<artifactId>commons-logging</artifactId>
<version>1.1.0.jboss</version>
</dependency>
+
+ <!-- needed for javaee examples-->
+ <dependency>
+ <groupId>org.jboss.ejb3</groupId>
+ <artifactId>jboss-ejb3-ext-api</artifactId>
+ <version>1.0.0</version>
+ </dependency>
</dependencies>
Modified: trunk/src/config/jboss-as/build.xml
===================================================================
--- trunk/src/config/jboss-as/build.xml 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/src/config/jboss-as/build.xml 2009-10-02 12:06:29 UTC (rev 8031)
@@ -53,7 +53,7 @@
<fileset dir="${src.dir}">
<exclude name="**/messaging/**"/>
<exclude name="conf/login-config.xml"/>
- <exclude name="**/jms-ra.rar"/>
+ <!--<exclude name="**/jms-ra.rar"/>-->
</fileset>
</copy>
<copy todir="${dest.dir}/conf">
@@ -78,7 +78,7 @@
<copy todir="${hornetq.sar.dir}">
<fileset dir="${hornetq.config.dir}" includes="*"/>
</copy>
- <unzip src="${lib.dir}/hornetq-ra.rar" dest="${dest.dir}/deploy/jms-ra.rar"/>
+ <unzip src="${lib.dir}/hornetq-ra.rar" dest="${dest.dir}/deploy/hornetq-ra.rar"/>
</target>
<!-- The JBOSS_HOME environment variable and the directory must exist, we also check we are running
Modified: trunk/src/config/jboss-as/clustered/jms-ds.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/jms-ds.xml 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/src/config/jboss-as/clustered/jms-ds.xml 2009-10-02 12:06:29 UTC (rev 8031)
@@ -16,7 +16,7 @@
<tx-connection-factory>
<jndi-name>JmsXA</jndi-name>
<xa-transaction/>
- <rar-name>jms-ra.rar</rar-name>
+ <rar-name>hornetq-ra.rar</rar-name>
<connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
<config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
<config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
Modified: trunk/src/config/jboss-as/non-clustered/jms-ds.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/jms-ds.xml 2009-10-02 03:39:15 UTC (rev 8030)
+++ trunk/src/config/jboss-as/non-clustered/jms-ds.xml 2009-10-02 12:06:29 UTC (rev 8031)
@@ -16,7 +16,7 @@
<tx-connection-factory>
<jndi-name>JmsXA</jndi-name>
<xa-transaction/>
- <rar-name>jms-ra.rar</rar-name>
+ <rar-name>hornetq-ra.rar</rar-name>
<connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
<config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
<config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
14 years, 7 months
JBoss hornetq SVN: r8030 - in branches/Replication_Clebert: src/main/org/hornetq/core/journal/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-01 23:39:15 -0400 (Thu, 01 Oct 2009)
New Revision: 8030
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -71,6 +71,8 @@
*/
void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+ void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception;
+
void appendRollbackRecord(long txID, boolean sync) throws Exception;
// Load
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -1163,7 +1163,17 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
+ }
+
+
/**
*
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
@@ -3342,7 +3352,12 @@
private static class NullEncoding implements EncodingSupport
{
- static NullEncoding instance = new NullEncoding();
+ private static NullEncoding instance = new NullEncoding();
+
+ public static NullEncoding getInstance()
+ {
+ return instance;
+ }
public void decode(final HornetQBuffer buffer)
{
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -12,7 +12,11 @@
*/
package org.hornetq.core.remoting.impl;
-
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
@@ -76,6 +80,11 @@
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -366,6 +375,31 @@
packet = new ReplicationAddMessage();
break;
}
+ case REPLICATION_APPEND_TX:
+ {
+ packet = new ReplicationAddTXMessage();
+ break;
+ }
+ case REPLICATION_DELETE:
+ {
+ packet = new ReplicationDeleteMessage();
+ break;
+ }
+ case REPLICATION_DELETE_TX:
+ {
+ packet = new ReplicationDeleteTXMessage();
+ break;
+ }
+ case REPLICATION_PREPARE:
+ {
+ packet = new ReplicationPrepareMessage();
+ break;
+ }
+ case REPLICATION_COMMIT_ROLLBACK:
+ {
+ packet = new ReplicationCommitMessage();
+ break;
+ }
case REPLICATION_RESPONSE:
{
packet = new ReplicationResponseMessage();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -145,8 +145,18 @@
public static final byte REPLICATION_APPEND = 81;
- public static final byte REPLICATION_DELETE = 82;
+ public static final byte REPLICATION_APPEND_TX = 82;
+ public static final byte REPLICATION_DELETE = 83;
+
+ public static final byte REPLICATION_DELETE_TX = 84;
+
+ public static final byte REPLICATION_PREPARE = 85;
+
+ public static final byte REPLICATION_COMMIT_ROLLBACK = 86;
+
+
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -115,6 +115,11 @@
{
return journalID;
}
+
+ public boolean isUpdate()
+ {
+ return isUpdate;
+ }
/**
* @return the recordType
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddTXMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationAddTXMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long txId;
+
+ private long id;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private boolean isUpdate;
+
+ private byte recordType;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationAddTXMessage()
+ {
+ super(REPLICATION_APPEND_TX);
+ }
+
+ public ReplicationAddTXMessage(byte journalID, boolean isUpdate, long txId, long id, byte recordType, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.isUpdate = isUpdate;
+ this.txId = txId;
+ this.id = id;
+ this.recordType = recordType;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeBoolean(isUpdate);
+ buffer.writeLong(txId);
+ buffer.writeLong(id);
+ buffer.writeByte(recordType);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ isUpdate = buffer.readBoolean();
+ txId = buffer.readLong();
+ id = buffer.readLong();
+ recordType = buffer.readByte();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ public boolean isUpdate()
+ {
+ return isUpdate;
+ }
+
+ /**
+ * @return the recordType
+ */
+ public byte getRecordType()
+ {
+ return recordType;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCommitMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationCommitMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private boolean rollback;
+
+ private long txId;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationCommitMessage()
+ {
+ super(REPLICATION_COMMIT_ROLLBACK);
+ }
+
+ public ReplicationCommitMessage(byte journalID, boolean rollback, long txId)
+ {
+ this();
+ this.journalID = journalID;
+ this.rollback = rollback;
+ this.txId = txId;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_LONG;
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeBoolean(rollback);
+ buffer.writeLong(txId);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ rollback = buffer.readBoolean();
+ txId = buffer.readLong();
+ }
+
+ public boolean isRollback()
+ {
+ return rollback;
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -35,15 +35,7 @@
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
-
- private boolean isUpdate;
- private byte recordType;
-
- private EncodingSupport encodingData;
-
- private byte[] recordData;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -53,14 +45,11 @@
super(REPLICATION_DELETE);
}
- public ReplicationDeleteMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationDeleteMessage(byte journalID, long id)
{
this();
this.journalID = journalID;
- this.isUpdate = isUpdate;
this.id = id;
- this.recordType = recordType;
- this.encodingData = encodingData;
}
// Public --------------------------------------------------------
@@ -69,11 +58,7 @@
{
return BASIC_PACKET_SIZE +
DataConstants.SIZE_BYTE +
- DataConstants.SIZE_BOOLEAN +
- DataConstants.SIZE_LONG +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_INT +
- (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+ DataConstants.SIZE_LONG;
}
@@ -81,23 +66,14 @@
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeByte(journalID);
- buffer.writeBoolean(isUpdate);
buffer.writeLong(id);
- buffer.writeByte(recordType);
- buffer.writeInt(encodingData.getEncodeSize());
- encodingData.encode(buffer);
}
@Override
public void decodeBody(final HornetQBuffer buffer)
{
journalID = buffer.readByte();
- isUpdate = buffer.readBoolean();
id = buffer.readLong();
- recordType = buffer.readByte();
- int size = buffer.readInt();
- recordData = new byte[size];
- buffer.readBytes(recordData);
}
/**
@@ -116,22 +92,7 @@
return journalID;
}
- /**
- * @return the recordType
- */
- public byte getRecordType()
- {
- return recordType;
- }
- /**
- * @return the recordData
- */
- public byte[] getRecordData()
- {
- return recordData;
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteTXMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationDeleteTXMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long txId;
+
+ private long id;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationDeleteTXMessage()
+ {
+ super(REPLICATION_DELETE_TX);
+ }
+
+ public ReplicationDeleteTXMessage(byte journalID, long txId, long id, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.txId = txId;
+ this.id = id;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeLong(txId);
+ buffer.writeLong(id);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ txId = buffer.readLong();
+ id = buffer.readLong();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPrepareMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long txId;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationPrepareMessage()
+ {
+ super(REPLICATION_DELETE_TX);
+ }
+
+ public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.txId = txId;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeLong(txId);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ txId = buffer.readLong();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ public long getTxId()
+ {
+ return txId;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -25,9 +25,9 @@
{
void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception;
+ void appendDeleteRecord(byte journalID, long id) throws Exception;
void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
@@ -37,10 +37,10 @@
void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
- void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception;
+ void appendCommitRecord(byte journalID, long txID) throws Exception;
- void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+ void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
- void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
+ void appendRollbackRecord(byte journalID, long txID) throws Exception;
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -19,9 +19,13 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
@@ -49,7 +53,7 @@
private Journal bindingsJournal;
private Journal messagingJournal;
-
+
private JournalStorageManager storage;
// Static --------------------------------------------------------
@@ -71,9 +75,28 @@
{
if (packet.getType() == PacketImpl.REPLICATION_APPEND)
{
- System.out.println("Replicated");
handleAppendAddRecord(packet);
}
+ else if (packet.getType() == PacketImpl.REPLICATION_APPEND_TX)
+ {
+ handleAppendAddTXRecord(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_DELETE)
+ {
+ handleAppendDelete(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_DELETE_TX)
+ {
+ handleAppendDeleteTX(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_PREPARE)
+ {
+ handlePrepare(packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ {
+ handleCommitRollback(packet);
+ }
}
catch (Exception e)
{
@@ -97,14 +120,14 @@
public void start() throws Exception
{
Configuration config = server.getConfiguration();
-
+
// TODO: this needs an executor
JournalStorageManager storage = new JournalStorageManager(config, null);
storage.start();
-
+
this.bindingsJournal = storage.getBindingsJournal();
this.messagingJournal = storage.getBindingsJournal();
-
+
// We only need to load internal structures on the backup...
storage.loadInternalOnly();
}
@@ -138,26 +161,128 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /**
+ * @param packet
+ */
+ private void handleCommitRollback(Packet packet) throws Exception
+ {
+ ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
+ Journal journalToUse = getJournal(commitMessage.getJournalID());
+
+
+ if (commitMessage.isRollback())
+ {
+ journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
+ }
+ else
+ {
+ journalToUse.appendCommitRecord(commitMessage.getTxId(), false);
+ }
+ }
+
/**
* @param packet
+ */
+ private void handlePrepare(Packet packet) throws Exception
+ {
+ ReplicationPrepareMessage prepareMessage = (ReplicationPrepareMessage)packet;
+
+ Journal journalToUse = getJournal(prepareMessage.getJournalID());
+
+ journalToUse.appendPrepareRecord(prepareMessage.getTxId(), prepareMessage.getRecordData(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDeleteTX(Packet packet) throws Exception
+ {
+ ReplicationDeleteTXMessage deleteMessage = (ReplicationDeleteTXMessage)packet;
+
+ Journal journalToUse = getJournal(deleteMessage.getJournalID());
+
+ journalToUse.appendDeleteRecordTransactional(deleteMessage.getTxId(),
+ deleteMessage.getId(),
+ deleteMessage.getRecordData());
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendDelete(Packet packet) throws Exception
+ {
+ ReplicationDeleteMessage deleteMessage = (ReplicationDeleteMessage)packet;
+
+ Journal journalToUse = getJournal(deleteMessage.getJournalID());
+
+ journalToUse.appendDeleteRecord(deleteMessage.getId(), false);
+ }
+
+ /**
+ * @param packet
+ */
+ private void handleAppendAddTXRecord(Packet packet) throws Exception
+ {
+ ReplicationAddTXMessage addMessage = (ReplicationAddTXMessage)packet;
+
+ Journal journalToUse = getJournal(addMessage.getJournalID());
+
+ if (addMessage.isUpdate())
+ {
+ journalToUse.appendUpdateRecordTransactional(addMessage.getTxId(),
+ addMessage.getId(),
+ addMessage.getRecordType(),
+ addMessage.getRecordData());
+ }
+ else
+ {
+ journalToUse.appendAddRecordTransactional(addMessage.getTxId(),
+ addMessage.getId(),
+ addMessage.getRecordType(),
+ addMessage.getRecordData());
+ }
+ }
+
+ /**
+ * @param packet
* @throws Exception
*/
private void handleAppendAddRecord(Packet packet) throws Exception
{
ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
- Journal journalToUse;
- if (addMessage.getJournalID() == (byte)0)
+ Journal journalToUse = getJournal(addMessage.getJournalID());
+
+ if (addMessage.isUpdate())
{
+ journalToUse.appendUpdateRecord(addMessage.getId(),
+ addMessage.getRecordType(),
+ addMessage.getRecordData(),
+ false);
+ }
+ else
+ {
+ journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+ }
+ }
+
+ /**
+ * @param journalID
+ * @return
+ */
+ private Journal getJournal(byte journalID)
+ {
+ Journal journalToUse;
+ if (journalID == (byte)0)
+ {
journalToUse = bindingsJournal;
}
else
{
journalToUse = messagingJournal;
}
-
- journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+ return journalToUse;
}
// Inner classes -------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -19,6 +19,7 @@
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
@@ -27,6 +28,12 @@
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
@@ -46,7 +53,7 @@
// Attributes ----------------------------------------------------
// TODO: where should this be configured?
- private static final int WINDOW_SIZE = 100 * 1024;
+ private static final int WINDOW_SIZE = 1024 * 1024;
private final ResponseHandler responseHandler = new ResponseHandler();
@@ -88,43 +95,52 @@
* @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
*/
- public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
+ public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
}
-
-
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport encodingData, boolean sync) throws Exception
+ public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
*/
- public void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception
+ public void appendDeleteRecord(byte journalID, long id) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
}
+ public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ {
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
+ }
- public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendUpdateRecordTransactional(byte journalID,
+ long txID,
+ long id,
+ byte recordType,
+ EncodingSupport record) throws Exception
{
-
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
}
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
*/
- public void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception
+ public void appendCommitRecord(byte journalID, long txID) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
}
/* (non-Javadoc)
@@ -132,8 +148,7 @@
*/
public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
}
/* (non-Javadoc)
@@ -141,46 +156,26 @@
*/
public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception
+ public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
}
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
*/
- public void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception
+ public void appendRollbackRecord(byte journalID, long txID) throws Exception
{
- // TODO Auto-generated method stub
-
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
}
/* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
- */
- public void appendUpdateRecordTransactional(byte journalID,
- long txID,
- long id,
- byte recordType,
- EncodingSupport record) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
-
-
-
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
public synchronized boolean isStarted()
@@ -202,7 +197,7 @@
this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
this.replicatingChannel.setHandler(this.responseHandler);
-
+
CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
WINDOW_SIZE);
@@ -312,4 +307,24 @@
}
+ private static class NullEncoding implements EncodingSupport
+ {
+
+ static NullEncoding instance = new NullEncoding();
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ }
+
+ public int getEncodeSize()
+ {
+ return 0;
+ }
+
+ }
+
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 22:37:37 UTC (rev 8029)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-02 03:39:15 UTC (rev 8030)
@@ -146,10 +146,18 @@
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- for (int i = 0; i < 100; i++)
- {
- manager.appendAddRecord((byte)0, i, (byte)1, new DataImplement());
- }
+
+ manager.appendAddRecord((byte)0, 1, (byte)1, new FakeData());
+ manager.appendUpdateRecord((byte)0, 1, (byte)2, new FakeData());
+ manager.appendDeleteRecord((byte)0, 1);
+ manager.appendAddRecordTransactional((byte)0, 2, 2, (byte)1, new FakeData());
+ manager.appendUpdateRecordTransactional((byte)0, 2, 2, (byte)2, new FakeData());
+ manager.appendCommitRecord((byte)0, 2);
+
+ manager.appendDeleteRecordTransactional((byte)0, 3, 4,new FakeData());
+ manager.appendPrepareRecord((byte)0, 3, new FakeData());
+ manager.appendRollbackRecord((byte)0, 3);
+
final CountDownLatch latch = new CountDownLatch(1);
manager.getReplicationToken().addFutureCompletion(new Runnable()
{
@@ -169,7 +177,7 @@
}
}
- class DataImplement implements EncodingSupport
+ class FakeData implements EncodingSupport
{
public void decode(HornetQBuffer buffer)
14 years, 7 months
JBoss hornetq SVN: r8029 - trunk/tests/src/org/hornetq/tests/integration/jms/divert.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-01 18:37:37 -0400 (Thu, 01 Oct 2009)
New Revision: 8029
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
Log:
small tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-01 22:18:38 UTC (rev 8028)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-01 22:37:37 UTC (rev 8029)
@@ -90,6 +90,30 @@
connection.close();
}
+ public void testClientACK() throws Exception
+ {
+ HornetQQueue queueSource = (HornetQQueue)createQueue("Source");
+ HornetQQueue queueTarget = (HornetQQueue)createQueue("Dest");
+
+
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ final MessageProducer producer = session.createProducer(queueSource);
+
+ final TextMessage message = session.createTextMessage("message text");
+ producer.send(message);
+
+ connection.start();
+
+ final MessageConsumer consumer = session.createConsumer(queueTarget);
+ TextMessage receivedMessage = (TextMessage)consumer.receive(1000);
+ assertNotNull(receivedMessage);
+ receivedMessage.acknowledge();
+
+ connection.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
14 years, 7 months
JBoss hornetq SVN: r8028 - trunk/tests/src/org/hornetq/tests/integration/jms/divert.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-01 18:18:38 -0400 (Thu, 01 Oct 2009)
New Revision: 8028
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
Log:
Adding failure condition on test
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-01 22:12:50 UTC (rev 8027)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-01 22:18:38 UTC (rev 8028)
@@ -13,17 +13,38 @@
package org.hornetq.tests.integration.jms.divert;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
import java.util.ArrayList;
+import java.util.List;
import javax.jms.*;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.jms.HornetQQueue;
import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
/**
* A DivertAndACKClientTest
@@ -50,6 +71,7 @@
HornetQQueue queueSource = (HornetQQueue)createQueue("Source");
HornetQQueue queueTarget = (HornetQQueue)createQueue("Dest");
+
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -94,12 +116,52 @@
divertList.add(divert);
config.setDivertConfigurations(divertList);
-
+
return config;
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+ protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
+ List<String> jndiBindings) throws Exception
+ {
+ int retryInterval = 1000;
+ double retryIntervalMultiplier = 1.0;
+ int reconnectAttempts = -1;
+ boolean failoverOnServerShutdown = true;
+ int callTimeout = 30000;
+ jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ false, // TODO: set this to true, and the test will fail
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
+ }
+
}
14 years, 7 months
JBoss hornetq SVN: r8027 - in trunk/tests/src/org/hornetq/tests: integration/jms/divert and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-01 18:12:50 -0400 (Thu, 01 Oct 2009)
New Revision: 8027
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/divert/
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
Modified:
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-165 - Adding test to replicate it
Added: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-01 22:12:50 UTC (rev 8027)
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.divert;
+
+import java.util.ArrayList;
+
+import javax.jms.*;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.cluster.DivertConfiguration;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * A DivertAndACKClientTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DivertAndACKClientTest extends JMSTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAutoACK() throws Exception
+ {
+ HornetQQueue queueSource = (HornetQQueue)createQueue("Source");
+ HornetQQueue queueTarget = (HornetQQueue)createQueue("Dest");
+
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final MessageProducer producer = session.createProducer(queueSource);
+
+ final TextMessage message = session.createTextMessage("message text");
+ producer.send(message);
+
+ connection.start();
+
+ final MessageConsumer consumer = session.createConsumer(queueTarget);
+ TextMessage receivedMessage = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(receivedMessage);
+
+ connection.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+
+ protected Configuration createDefaultConfig(final boolean netty)
+ {
+ Configuration config = super.createDefaultConfig(netty);
+
+ DivertConfiguration divert = new DivertConfiguration("local-divert",
+ "some-name",
+ "jms.queue.Source",
+ "jms.queue.Dest",
+ true,
+ null,
+ null);
+
+ ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+ divertList.add(divert);
+
+ config.setDivertConfigurations(divertList);
+
+ return config;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-10-01 21:02:07 UTC (rev 8026)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-10-01 22:12:50 UTC (rev 8027)
@@ -122,7 +122,7 @@
conf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));
- server = HornetQ.newHornetQServer(conf, false);
+ server = HornetQ.newHornetQServer(conf, usePersistence());
jmsServer = new JMSServerManagerImpl(server);
context = new InVMContext();
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-10-01 21:02:07 UTC (rev 8026)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-10-01 22:12:50 UTC (rev 8027)
@@ -250,6 +250,8 @@
configuration.setLargeMessagesDirectory(getLargeMessagesDir());
configuration.setJournalCompactMinFiles(0);
configuration.setJournalCompactPercentage(0);
+
+ configuration.setFileDeploymentEnabled(false);
configuration.setJournalType(JournalType.ASYNCIO);
14 years, 7 months
JBoss hornetq SVN: r8026 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-01 17:02:07 -0400 (Thu, 01 Oct 2009)
New Revision: 8026
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
Modified:
branches/Replication_Clebert/.classpath
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
backup changes
Modified: branches/Replication_Clebert/.classpath
===================================================================
--- branches/Replication_Clebert/.classpath 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/.classpath 2009-10-01 21:02:07 UTC (rev 8026)
@@ -97,7 +97,7 @@
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
- <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/netty-3.1.3.GA"/>
<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -13,6 +13,7 @@
package org.hornetq.core.remoting.impl;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
@@ -75,6 +76,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -364,6 +366,11 @@
packet = new ReplicationAddMessage();
break;
}
+ case REPLICATION_RESPONSE:
+ {
+ packet = new ReplicationResponseMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -28,16 +28,17 @@
// Constants -------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(PacketImpl.class);
-
+
// The minimal size for all the packets, Common data for all the packets (look at PacketImpl.encode)
- protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+ protected static final int BASIC_PACKET_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG;
private long channelID;
private final byte type;
-
+
private int size;
-
+
// The packet types
// -----------------------------------------------------------------------------------
@@ -64,15 +65,14 @@
public static final byte CREATE_QUEUE = 34;
public static final byte DELETE_QUEUE = 35;
-
+
public static final byte CREATE_REPLICATION = 36;
-
// Session
public static final byte SESS_CREATECONSUMER = 40;
public static final byte SESS_ACKNOWLEDGE = 41;
-
+
public static final byte SESS_EXPIRED = 42;
public static final byte SESS_COMMIT = 43;
@@ -128,7 +128,7 @@
public static final byte SESS_FLOWTOKEN = 70;
public static final byte SESS_SEND = 71;
-
+
public static final byte SESS_SEND_LARGE = 72;
public static final byte SESS_SEND_CONTINUATION = 73;
@@ -138,11 +138,15 @@
public static final byte SESS_RECEIVE_MSG = 75;
public static final byte SESS_RECEIVE_CONTINUATION = 76;
-
+
// Replication
-
- public static final byte REPLICATION_APPEND = 80;
+ public static final byte REPLICATION_RESPONSE = 80;
+
+ public static final byte REPLICATION_APPEND = 81;
+
+ public static final byte REPLICATION_DELETE = 82;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -152,7 +156,6 @@
// Public --------------------------------------------------------
-
public byte getType()
{
return type;
@@ -167,40 +170,40 @@
{
this.channelID = channelID;
}
-
+
public int encode(final HornetQBuffer buffer)
{
// The standard header fields
buffer.writeInt(0); // The length gets filled in at the end
buffer.writeByte(type);
buffer.writeLong(channelID);
-
+
encodeBody(buffer);
size = buffer.writerIndex();
-
+
// The length doesn't include the actual length byte
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
-
+
return size;
}
public void decode(final HornetQBuffer buffer)
{
channelID = buffer.readLong();
-
+
decodeBody(buffer);
-
+
size = buffer.readerIndex();
}
-
+
public final int getPacketSize()
{
return size;
}
-
+
public int getRequiredBufferSize()
{
return BASIC_PACKET_SIZE;
@@ -256,7 +259,7 @@
{
return DataConstants.SIZE_INT + str.length() * 2;
}
-
+
protected int nullableStringEncodeSize(String str)
{
return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -35,6 +35,8 @@
/** 0 - Bindings, 1 - MessagesJournal */
private byte journalID;
+
+ private boolean isUpdate;
private byte recordType;
@@ -51,10 +53,11 @@
super(REPLICATION_APPEND);
}
- public ReplicationAddMessage(byte journalID, long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
{
this();
this.journalID = journalID;
+ this.isUpdate = isUpdate;
this.id = id;
this.recordType = recordType;
this.encodingData = encodingData;
@@ -64,7 +67,9 @@
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT +
@@ -76,6 +81,7 @@
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeByte(journalID);
+ buffer.writeBoolean(isUpdate);
buffer.writeLong(id);
buffer.writeByte(recordType);
buffer.writeInt(encodingData.getEncodeSize());
@@ -86,6 +92,7 @@
public void decodeBody(final HornetQBuffer buffer)
{
journalID = buffer.readByte();
+ isUpdate = buffer.readBoolean();
id = buffer.readLong();
recordType = buffer.readByte();
int size = buffer.readInt();
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationDeleteMessage.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationDeleteMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
+ private boolean isUpdate;
+
+ private byte recordType;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationDeleteMessage()
+ {
+ super(REPLICATION_DELETE);
+ }
+
+ public ReplicationDeleteMessage(byte journalID, boolean isUpdate, long id, byte recordType, EncodingSupport encodingData)
+ {
+ this();
+ this.journalID = journalID;
+ this.isUpdate = isUpdate;
+ this.id = id;
+ this.recordType = recordType;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalID);
+ buffer.writeBoolean(isUpdate);
+ buffer.writeLong(id);
+ buffer.writeByte(recordType);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ journalID = buffer.readByte();
+ isUpdate = buffer.readBoolean();
+ id = buffer.readLong();
+ recordType = buffer.readByte();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordType
+ */
+ public byte getRecordType()
+ {
+ return recordType;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationResponseMessage.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class ReplicationResponseMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationResponseMessage()
+ {
+ super(REPLICATION_RESPONSE);
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.Packet#getRequiredBufferSize()
+ */
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE;
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -16,13 +16,31 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.server.HornetQComponent;
-
/**
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
*
*/
-public interface ReplicationManager extends HornetQComponent
+public interface ReplicationManager extends HornetQComponent
{
- void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record);
+ void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+ void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+
+ void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception;
+
+ void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+ void appendUpdateRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+
+ void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception;
+
+ void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
+
+ void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception;
+
+ void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+
+ void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -22,6 +22,7 @@
import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
@@ -79,7 +80,7 @@
// TODO: what to do when the IO fails on the backup side? should we shutdown the backup?
log.warn(e.getMessage(), e);
}
- channel.send(new NullResponseMessage());
+ channel.send(new ReplicationResponseMessage());
}
/* (non-Javadoc)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -90,10 +90,97 @@
public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
{
- sendReplicatePacket(new ReplicationAddMessage(journalID, id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, encodingData));
}
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport encodingData, boolean sync) throws Exception
+ {
+ sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, encodingData));
+ }
/* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
+ */
+ public void appendDeleteRecord(byte journalID, long id, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+ public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
+ */
+ public void appendCommitRecord(byte journalID, long txID, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long)
+ */
+ public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
+ */
+ public void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendUpdateRecordTransactional(byte journalID,
+ long txID,
+ long id,
+ byte recordType,
+ EncodingSupport record) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
public synchronized boolean isStarted()
@@ -217,7 +304,7 @@
public void handlePacket(Packet packet)
{
System.out.println("HandlePacket on client");
- if (packet.getType() == PacketImpl.NULL_RESPONSE)
+ if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
{
replicated();
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -52,7 +52,7 @@
/** To be called by the replication manager, when data is confirmed on the channel */
public synchronized void replicated()
{
- if (pendings-- == 0)
+ if (--pendings == 0)
{
if (tasks != null)
{
@@ -63,13 +63,12 @@
tasks.clear();
}
}
- System.out.println("pendings (replicated) = " + pendings);
}
/** You may have several actions to be done after a replication operation is completed. */
public synchronized void addFutureCompletion(Runnable runnable)
{
- System.out.println("pendings = " + pendings);
+ System.out.println("pendings on addFutureCompletion = " + pendings);
if (pendings == 0)
{
executor.execute(runnable);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -603,10 +603,10 @@
if (replicationEndpoint == null)
{
replicationEndpoint = new ReplicationEndpointImpl(this);
+ replicationEndpoint.setChannel(channel);
replicationEndpoint.start();
}
- replicationEndpoint.setChannel(channel);
return replicationEndpoint;
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 14:01:21 UTC (rev 8025)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 21:02:07 UTC (rev 8026)
@@ -146,7 +146,10 @@
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- manager.appendAddRecord((byte)0, 1, (byte)1, new DataImplement());
+ for (int i = 0; i < 100; i++)
+ {
+ manager.appendAddRecord((byte)0, i, (byte)1, new DataImplement());
+ }
final CountDownLatch latch = new CountDownLatch(1);
manager.getReplicationToken().addFutureCompletion(new Runnable()
{
14 years, 7 months
JBoss hornetq SVN: r8025 - in trunk: src/main/org/hornetq/core/management/impl and 2 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-01 10:01:21 -0400 (Thu, 01 Oct 2009)
New Revision: 8025
Modified:
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
Log:
HORNETQ-151: an empty string must be considered as a null filter from management API
applied patch by Sergej Z
* allow empty filter strings and treat it as a null filter
* added overloaded methods without the filter parameter
* tests
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-10-01 13:22:19 UTC (rev 8024)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-10-01 14:01:21 UTC (rev 8025)
@@ -132,12 +132,17 @@
@Operation(desc = "Create a queue with the specified address", impact = ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
-
+
@Operation(desc = "Create a queue", impact = ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filter,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
+
+ @Operation(desc = "Create a queue with the specified address, name and durability", impact = ACTION)
+ void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+ @Parameter(name = "name", desc = "Name of the queue") String name,
+ @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
@Operation(desc = "Deploy a queue", impact = ACTION)
void deployQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-10-01 13:22:19 UTC (rev 8024)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-10-01 14:01:21 UTC (rev 8025)
@@ -265,10 +265,19 @@
{
server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
}
+
+ public void createQueue(final String address, final String name, final boolean durable) throws Exception
+ {
+ server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
+ }
public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
{
- SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+ SimpleString filter = null;
+ if (filterStr != null && !filterStr.trim().equals(""))
+ {
+ filter = new SimpleString(filterStr);
+ }
server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-10-01 13:22:19 UTC (rev 8024)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-10-01 14:01:21 UTC (rev 8025)
@@ -200,7 +200,32 @@
checkNoResource(ObjectNames.getQueueObjectName(address, name));
}
+
+ public void testCreateAndDestroyQueue_3() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString name = RandomUtil.randomSimpleString();
+ boolean durable = true;
+ HornetQServerControl serverControl = createManagementControl();
+
+ checkNoResource(ObjectNames.getQueueObjectName(address, name));
+
+ serverControl.createQueue(address.toString(), name.toString(), durable);
+
+ checkResource(ObjectNames.getQueueObjectName(address, name));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ assertEquals(address.toString(), queueControl.getAddress());
+ assertEquals(name.toString(), queueControl.getName());
+ assertNull(queueControl.getFilter());
+ assertEquals(durable, queueControl.isDurable());
+ assertEquals(false, queueControl.isTemporary());
+
+ serverControl.destroyQueue(name.toString());
+
+ checkNoResource(ObjectNames.getQueueObjectName(address, name));
+ }
+
public void testCreateAndDestroyQueueWithNullFilter() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
@@ -227,6 +252,32 @@
checkNoResource(ObjectNames.getQueueObjectName(address, name));
}
+ public void testCreateAndDestroyQueueWithEmptyStringForFilter() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString name = RandomUtil.randomSimpleString();
+ String filter = "";
+ boolean durable = true;
+
+ HornetQServerControl serverControl = createManagementControl();
+
+ checkNoResource(ObjectNames.getQueueObjectName(address, name));
+
+ serverControl.createQueue(address.toString(), name.toString(), filter, durable);
+
+ checkResource(ObjectNames.getQueueObjectName(address, name));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ assertEquals(address.toString(), queueControl.getAddress());
+ assertEquals(name.toString(), queueControl.getName());
+ assertNull(queueControl.getFilter());
+ assertEquals(durable, queueControl.isDurable());
+ assertEquals(false, queueControl.isTemporary());
+
+ serverControl.destroyQueue(name.toString());
+
+ checkNoResource(ObjectNames.getQueueObjectName(address, name));
+ }
+
public void testGetQueueNames() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-10-01 13:22:19 UTC (rev 8024)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-10-01 14:01:21 UTC (rev 8025)
@@ -108,6 +108,11 @@
proxy.invokeOperation("createQueue", address, name, filter, durable);
}
+ public void createQueue(String address, String name, boolean durable) throws Exception
+ {
+ proxy.invokeOperation("createQueue", address, name, durable);
+ }
+
public void deployQueue(String address, String name, String filter, boolean durable) throws Exception
{
proxy.invokeOperation("deployQueue", address, name, filter, durable);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-10-01 13:22:19 UTC (rev 8024)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-10-01 14:01:21 UTC (rev 8025)
@@ -159,7 +159,12 @@
{
}
+
+ public void createQueue(String address, String name, boolean durable) throws Exception
+ {
+ }
+
public String[] getAddressNames()
{
return null;
14 years, 7 months
JBoss hornetq SVN: r8024 - trunk/src/main/org/hornetq/core/deployers/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-01 09:22:19 -0400 (Thu, 01 Oct 2009)
New Revision: 8024
Modified:
trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
Log:
HORNETQ-122: FileDeploymentManager does not differentiate between File and Ear supplied hornetq-jms.xml
* do no check file existence using File.exists() as it does not work when the resource is inside an EAR.
Instead, check that the file is in the current thread context classloader's resources
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-10-01 09:12:08 UTC (rev 8023)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-10-01 13:22:19 UTC (rev 8024)
@@ -224,12 +224,12 @@
for (Map.Entry<Pair<URL, Deployer>, DeployInfo> entry : deployed.entrySet())
{
Pair<URL, Deployer> pair = entry.getKey();
- if (!new File(pair.a.getFile()).exists())
+ if (!fileExists(pair.a))
{
try
{
Deployer deployer = entry.getValue().deployer;
- log.debug("Undeploying " + deployer + " with url " + pair.a);
+ log.info("Undeploying " + deployer + " with url " + pair.a);
deployer.undeploy(pair.a);
toRemove.add(pair);
}
@@ -259,6 +259,36 @@
{
return deployed;
}
+
+ // Private -------------------------------------------------------
+
+ /**
+ * Checks if the URL is among the current thread context class loader's resources.
+ *
+ * We do not check that the corresponding file exists using File.exists() directly as it would fail
+ * in the case the resource is loaded from inside an EAR file (see https://jira.jboss.org/jira/browse/HORNETQ-122)
+ */
+ private boolean fileExists(URL resourceURL)
+ {
+ try
+ {
+ File f = new File(resourceURL.getPath());
+ Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(f.getName());
+ while (resources.hasMoreElements())
+ {
+ URL url = (URL)resources.nextElement();
+ if (url.equals(resourceURL))
+ {
+ return true;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ return false;
+ }
+ return false;
+ }
// Inner classes -------------------------------------------------------------------------------------------
14 years, 7 months
JBoss hornetq SVN: r8023 - trunk/src/main/org/hornetq/core/remoting/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-01 05:12:08 -0400 (Thu, 01 Oct 2009)
New Revision: 8023
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
Log:
fixed hanging test, slight improvement to fix
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-01 09:09:08 UTC (rev 8022)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-01 09:12:08 UTC (rev 8023)
@@ -346,8 +346,10 @@
if (sendSemaphore != null)
{
- //Any threads blocking on the send semaphore should be allowed to return
- sendSemaphore.release(Integer.MAX_VALUE - sendSemaphore.availablePermits());
+ //Any threads blocking on the send semaphore should be allowed to return - we do this by just giving it
+ //a lot of permits - note we don't give it Integer.MAX_VALUE since then if if more releases come in that
+ //could end up with permit count going -ve which would cause subsequent sends to block
+ sendSemaphore.release(Integer.MAX_VALUE / 2);
}
if (!connection.isDestroyed() && !connection.removeChannel(id))
14 years, 7 months