JBoss hornetq SVN: r9668 - in trunk: examples/common/src/org/hornetq/common/example and 38 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-10 13:44:13 -0400 (Fri, 10 Sep 2010)
New Revision: 9668
Merging fixes from Branch_2_1, r9591-r9608, r9612-r9659, r9660-9663
Modified: trunk/build-hornetq.xml
--- trunk/build-hornetq.xml 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/build-hornetq.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -87,6 +87,7 @@
<property name="resources.jar.name" value="hornetq-resources.jar"/>
<property name="resources.sources.jar.name" value="hornetq-resources-sources.jar"/>
<property name="twitter4j.jar.name" value="twitter4j-core.jar"/>
+ <property name="eap.examples.zip.name" value="hornetq-eap-examples.zip"/>
<!--source and build dirs-->
<property name="build.dir" value="build"/>
@@ -1326,8 +1327,30 @@
<gzip src="${build.dir}/${build.artifact}.tar"
+ <target name="eap-examples" description="Generates a file with examples tuned for the JBoss EAP" depends="init">
+ <mkdir dir="${build.dir}/eap-examples-tmp"/>
+ <copy todir="${build.dir}/eap-examples-tmp">
+ <fileset dir="${examples.dir}" excludes="**/build.sh,**/build.bat, **/twitter-connector/**"/>
+ </copy>
+ <copy todir="${build.dir}/eap-examples-tmp" overwrite="true">
+ <fileset dir="examples-eap"/>
+ </copy>
+ <replace dir="${build.dir}/eap-examples-tmp" token="hornetq-ra.rar" value="jms-ra.rar"></replace>
+ <zip destfile="${build.jars.dir}/${eap.examples.zip.name}">
+ <zipfileset dir="${build.dir}/eap-examples-tmp" prefix="examples/hornetq"/>
+ </zip>
+ <delete dir="${build.dir}/eap-examples-tmp"/>
+ </target>
<target name="source-distro">
<mkdir dir="${build.dir}"/>
<zip destfile="${build.dir}/${build.artifact}-src.zip">
Modified: trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
--- trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -17,8 +17,6 @@
import java.util.Properties;
import java.util.logging.Logger;
-import javax.jms.Connection;
-import javax.jms.JMSException;
import javax.naming.InitialContext;
@@ -32,26 +30,26 @@
private Process[] servers;
- private Connection conn;
+ protected boolean failure = false;
- private boolean failure = false;
+ protected String serverClasspath;
- private String serverClasspath;
+ protected String serverProps;
- private String serverProps;
public abstract boolean runExample() throws Exception;
private boolean logServerOutput;
- private String[] configs;
+ protected String[] configs;
+ protected boolean runServer;
protected void run(final String[] configs)
String runServerProp = System.getProperty("hornetq.example.runServer");
String logServerOutputProp = System.getProperty("hornetq.example.logserveroutput");
serverClasspath = System.getProperty("hornetq.example.server.classpath");
- boolean runServer = runServerProp == null ? true : Boolean.valueOf(runServerProp);
+ runServer = runServerProp == null ? true : Boolean.valueOf(runServerProp);
logServerOutput = logServerOutputProp == null ? false : Boolean.valueOf(logServerOutputProp);
serverProps = System.getProperty("hornetq.example.server.args");
if (System.getProperty("hornetq.example.server.override.args") != null)
@@ -83,17 +81,6 @@
- if (conn != null)
- {
- try
- {
- conn.close();
- }
- catch (JMSException e)
- {
- // ignore
- }
- }
if (runServer)
@@ -165,7 +152,7 @@
- private void startServers() throws Exception
+ protected void startServers() throws Exception
servers = new Process[configs.length];
for (int i = 0; i < configs.length; i++)
@@ -174,7 +161,7 @@
- private void stopServers() throws Exception
+ protected void stopServers() throws Exception
for (Process server : servers)
@@ -185,7 +172,7 @@
- private void stopServer(final Process server) throws Exception
+ protected void stopServer(final Process server) throws Exception
if (!System.getProperty("os.name").contains("Windows") && !System.getProperty("os.name").contains("Mac OS X"))
Added: trunk/examples/soak/tx-restarts/README
--- trunk/examples/soak/tx-restarts/README (rev 0)
+++ trunk/examples/soak/tx-restarts/README 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,29 @@
+* Soak Test For TX survival over restarts
+Run The Test
+To run the test simply use ./build.sh
+It's important that you always clean the data directory before starting the test, as it will validate for sequences generated.
+The test will start and stop a server multiple times.
+Run the server remotely
+You can start the server directly if you want, you can just start the server as:
+./run.sh PATH_TO_HORNETQ/examples/soak/tx-restarts/server0
+Then you can run the test as:
+./build.sh runRemote
+And you can now kill and restart the server manually as many times as you want.
Added: trunk/examples/soak/tx-restarts/build.bat
--- trunk/examples/soak/tx-restarts/build.bat (rev 0)
+++ trunk/examples/soak/tx-restarts/build.bat 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,13 @@
+@echo off
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
Added: trunk/examples/soak/tx-restarts/build.sh
--- trunk/examples/soak/tx-restarts/build.sh (rev 0)
+++ trunk/examples/soak/tx-restarts/build.sh 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,15 @@
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+ # running from the distro
+ ../../../bin/build.sh "$@"
Added: trunk/examples/soak/tx-restarts/build.xml
--- trunk/examples/soak/tx-restarts/build.xml (rev 0)
+++ trunk/examples/soak/tx-restarts/build.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="TX-Restarts soak test">
+ <import file="../../common/build.xml"/>
+ <property file="ant.properties"/>
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.TXRestartSoak"/>
+ <param name="java-min-memory" value="1G"/>
+ <param name="java-max-memory" value="1G"/>
+ </antcall>
+ </target>
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.TXRestartSoak"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
Added: trunk/examples/soak/tx-restarts/server0/client-jndi.properties
--- trunk/examples/soak/tx-restarts/server0/client-jndi.properties (rev 0)
+++ trunk/examples/soak/tx-restarts/server0/client-jndi.properties 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,3 @@
Added: trunk/examples/soak/tx-restarts/server0/hornetq-beans.xml
--- trunk/examples/soak/tx-restarts/server0/hornetq-beans.xml (rev 0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-beans.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
Added: trunk/examples/soak/tx-restarts/server0/hornetq-configuration.xml
--- trunk/examples/soak/tx-restarts/server0/hornetq-configuration.xml (rev 0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-configuration.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,65 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+ <journal-file-size>102400</journal-file-size>
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ </acceptors>
+ <address-settings>
+ <address-setting match="jms.queue.#">
+ <max-delivery-attempts>-1</max-delivery-attempts>
+ <!-- <max-size-bytes>335544320000</max-size-bytes> -->
+ <max-size-bytes>33554432</max-size-bytes>
+ <page-size-bytes>16777216</page-size-bytes>
+ <address-full-policy>PAGE</address-full-policy>
+ </address-setting>
+ </address-settings>
+ <diverts>
+ <divert name="div1">
+ <address>jms.queue.inputQueue</address>
+ <forwarding-address>jms.queue.diverted1</forwarding-address>
+ <exclusive>true</exclusive>
+ </divert>
+ <divert name="div2">
+ <address>jms.queue.inputQueue</address>
+ <forwarding-address>jms.queue.diverted2</forwarding-address>
+ <exclusive>true</exclusive>
+ </divert>
+ </diverts>
+ <!-- Other config -->
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.#">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
Added: trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml
--- trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml (rev 0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,27 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <min-large-message-size>100240</min-large-message-size>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+ <!--the queue used by the example-->
+ <queue name="inputQueue">
+ <entry name="/queue/inputQueue"/>
+ </queue>
+ <queue name="diverted1">
+ <entry name="/queue/diverted1"/>
+ </queue>
+ <queue name="diverted2">
+ <entry name="/queue/diverted2"/>
+ </queue>
Added: trunk/examples/soak/tx-restarts/server0/hornetq-users.xml
--- trunk/examples/soak/tx-restarts/server0/hornetq-users.xml (rev 0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-users.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
\ No newline at end of file
Added: trunk/examples/soak/tx-restarts/server0/jndi.properties
--- trunk/examples/soak/tx-restarts/server0/jndi.properties (rev 0)
+++ trunk/examples/soak/tx-restarts/server0/jndi.properties 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,2 @@
\ No newline at end of file
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java (rev 0)
+++ trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,278 @@
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.utils.UUIDGenerator;
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class ClientAbstract extends Thread
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ClientAbstract.class.getName());
+ // Attributes ----------------------------------------------------
+ protected InitialContext ctx;
+ protected XAConnection conn;
+ protected XASession sess;
+ protected XAResource activeXAResource;
+ protected Xid activeXid;
+ protected volatile boolean running = true;
+ protected volatile int errors = 0;
+ /**
+ * A commit was called
+ * case we don't find the Xid, means it was accepted
+ */
+ protected volatile boolean pendingCommit = false;
+ // Static --------------------------------------------------------
+ // Constructors --------------------------------------------------
+ // Public --------------------------------------------------------
+ protected InitialContext getContext(final int serverId) throws Exception
+ {
+ String jndiFilename = "server" + serverId + "/client-jndi.properties";
+ File jndiFile = new File(jndiFilename);
+ Properties props = new Properties();
+ FileInputStream inStream = null;
+ try
+ {
+ inStream = new FileInputStream(jndiFile);
+ props.load(inStream);
+ }
+ finally
+ {
+ if (inStream != null)
+ {
+ inStream.close();
+ }
+ }
+ return new InitialContext(props);
+ }
+ public XAConnection getConnection()
+ {
+ return conn;
+ }
+ public int getErrorsCount()
+ {
+ return errors;
+ }
+ public final void connect()
+ {
+ while (running)
+ {
+ try
+ {
+ disconnect();
+ ctx = getContext(0);
+ XAConnectionFactory cf = (XAConnectionFactory)ctx.lookup("/ConnectionFactory");
+ conn = cf.createXAConnection();
+ sess = conn.createXASession();
+ activeXAResource = sess.getXAResource();
+ if (activeXid != null)
+ {
+ synchronized (ClientAbstract.class)
+ {
+ Xid[] xids = activeXAResource.recover(XAResource.TMSTARTRSCAN);
+ boolean found = false;
+ for (Xid recXid : xids)
+ {
+ if (recXid.equals(activeXid))
+ {
+ // System.out.println("Calling commit after a prepare on " + this);
+ found = true;
+ callCommit();
+ }
+ }
+ if (!found)
+ {
+ if (pendingCommit)
+ {
+ System.out.println("Doing a commit based on a pending commit on " + this);
+ onCommit();
+ }
+ else
+ {
+ System.out.println("Doing a rollback on " + this);
+ onRollback();
+ }
+ activeXid = null;
+ pendingCommit = false;
+ }
+ }
+ }
+ connectClients();
+ break;
+ }
+ catch (Exception e)
+ {
+ ClientAbstract.log.warning("Can't connect to server, retrying");
+ disconnect();
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignored)
+ {
+ // if an interruption was sent, we will respect it and leave the loop
+ break;
+ }
+ }
+ }
+ }
+ @Override
+ public void run()
+ {
+ connect();
+ }
+ protected void callCommit() throws Exception
+ {
+ pendingCommit = true;
+ activeXAResource.commit(activeXid, false);
+ pendingCommit = false;
+ activeXid = null;
+ onCommit();
+ }
+ protected void callPrepare() throws Exception
+ {
+ activeXAResource.prepare(activeXid);
+ }
+ public void beginTX() throws Exception
+ {
+ activeXid = newXID();
+ activeXAResource.start(activeXid, XAResource.TMNOFLAGS);
+ }
+ public void endTX() throws Exception
+ {
+ activeXAResource.end(activeXid, XAResource.TMSUCCESS);
+ callPrepare();
+ callCommit();
+ }
+ public void setRunning(final boolean running)
+ {
+ this.running = running;
+ }
+ /**
+ * @return
+ */
+ private XidImpl newXID()
+ {
+ return new XidImpl("tst".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+ protected abstract void connectClients() throws Exception;
+ protected abstract void onCommit();
+ protected abstract void onRollback();
+ public void disconnect()
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+ try
+ {
+ if (ctx != null)
+ {
+ ctx.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+ ctx = null;
+ conn = null;
+ // it's not necessary to close the session as conn.close() will already take care of that
+ sess = null;
+ }
+ // Package protected ---------------------------------------------
+ // Protected -----------------------------------------------------
+ // Private -------------------------------------------------------
+ // Inner classes -------------------------------------------------
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java (rev 0)
+++ trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,192 @@
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import org.hornetq.utils.ReusableLatch;
+ * A Receiver
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Receiver extends ClientAbstract
+ // Constants -----------------------------------------------------
+ // Attributes ----------------------------------------------------
+ private Queue queue;
+ // We should leave some messages on paging. We don't want to consume all for this test
+ private final Semaphore minConsume = new Semaphore(0);
+ private final ReusableLatch latchMax = new ReusableLatch(0);
+ private static final int MAX_DIFF = 10000;
+ // The difference between producer and consuming
+ private final AtomicInteger currentDiff = new AtomicInteger(0);
+ private final String queueJNDI;
+ protected long msgs = 0;
+ protected int pendingMsgs = 0;
+ protected int pendingSemaphores = 0;
+ protected MessageConsumer cons;
+ // Static --------------------------------------------------------
+ // Constructors --------------------------------------------------
+ public Receiver(String queueJNDI)
+ {
+ super();
+ this.queueJNDI = queueJNDI;
+ }
+ // Public --------------------------------------------------------
+ public void run()
+ {
+ super.run();
+ while (running)
+ {
+ try
+ {
+ beginTX();
+ for (int i = 0 ; i < 1000; i++)
+ {
+ Message msg = cons.receive(5000);
+ if (msg == null)
+ {
+ break;
+ }
+ if (msg.getLongProperty("count") != msgs + pendingMsgs)
+ {
+ errors++;
+ System.out.println("count should be " + (msgs + pendingMsgs) + " when it was " + msg.getLongProperty("count") + " on " + queueJNDI);
+ }
+ pendingMsgs++;
+ if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS))
+ {
+ break;
+ }
+ }
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+ }
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#connectClients()
+ */
+ @Override
+ protected void connectClients() throws Exception
+ {
+ queue = (Queue)ctx.lookup(queueJNDI);
+ cons = sess.createConsumer(queue);
+ conn.start();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ msgs += pendingMsgs;
+ this.currentDiff.addAndGet(-pendingMsgs);
+ latchMax.countDown(pendingMsgs);
+ pendingMsgs = 0;
+ System.out.println("Commit on consumer " + queueJNDI + ", msgs=" + msgs + " currentDiff = " + currentDiff);
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ System.out.println("Rollback on consumer " + queueJNDI + ", msgs=" + msgs);
+ minConsume.release(pendingMsgs);
+ pendingMsgs = 0;
+ }
+ public String toString()
+ {
+ return "Receiver::" + this.queueJNDI + ", msgs=" + msgs + ", pending=" + pendingMsgs;
+ }
+ /**
+ * @param pendingMsgs2
+ */
+ public void messageProduced(int producedMessages)
+ {
+ minConsume.release(producedMessages);
+ currentDiff.addAndGet(producedMessages);
+ System.out.println("Msg produced on " + this.queueJNDI + ", currentDiff = " + currentDiff);
+ if (currentDiff.get() > MAX_DIFF)
+ {
+ System.out.println("Holding producer for 5 seconds");
+ latchMax.setCount(currentDiff.get() - MAX_DIFF);
+ try
+ {
+ latchMax.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ // Package protected ---------------------------------------------
+ // Protected -----------------------------------------------------
+ // Private -------------------------------------------------------
+ // Inner classes -------------------------------------------------
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java (rev 0)
+++ trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,129 @@
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+ * A Sender
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class Sender extends ClientAbstract
+ // Constants -----------------------------------------------------
+ // Attributes ----------------------------------------------------
+ protected MessageProducer producer;
+ protected Queue queue;
+ protected long msgs = TXRestartSoak.MIN_MESSAGES_ON_QUEUE;
+ protected int pendingMsgs = 0;
+ protected final Receiver[] receivers;
+ // Static --------------------------------------------------------
+ // Constructors --------------------------------------------------
+ // Public --------------------------------------------------------
+ public Sender(final Receiver[] receivers)
+ {
+ this.receivers = receivers;
+ }
+ @Override
+ protected void connectClients() throws Exception
+ {
+ queue = (Queue)ctx.lookup("/queue/inputQueue");
+ producer = sess.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+ public void run()
+ {
+ super.run();
+ while (running)
+ {
+ try
+ {
+ beginTX();
+ for (int i = 0 ; i < 1000; i++)
+ {
+ BytesMessage msg = sess.createBytesMessage();
+ msg.setLongProperty("count", pendingMsgs + msgs);
+ msg.writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ pendingMsgs++;
+ }
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+ }
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ this.msgs += pendingMsgs;
+ for (Receiver rec : receivers)
+ {
+ rec.messageProduced(pendingMsgs);
+ }
+ pendingMsgs = 0;
+ System.out.println("commit on sender msgs = " + msgs );
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ pendingMsgs = 0;
+ System.out.println("Rolled back msgs=" + msgs);
+ }
+ public String toString()
+ {
+ return "Sender, msgs=" + msgs + ", pending=" + pendingMsgs;
+ }
+ // Package protected ---------------------------------------------
+ // Protected -----------------------------------------------------
+ // Private -------------------------------------------------------
+ // Inner classes -------------------------------------------------
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java (rev 0)
+++ trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,179 @@
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+import org.hornetq.common.example.HornetQExample;
+ *
+ * This is used as a soak test to verify HornetQ's capability of persistent messages over restarts.
+ *
+ * This is used as a smoke test before releases.
+ *
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TXRestartSoak extends HornetQExample
+ public static final int MIN_MESSAGES_ON_QUEUE = 50000;
+ private static final Logger log = Logger.getLogger(TXRestartSoak.class.getName());
+ public static void main(final String[] args)
+ {
+ new TXRestartSoak().run(args);
+ }
+ private TXRestartSoak()
+ {
+ super();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.common.example.HornetQExample#runExample()
+ */
+ @Override
+ public boolean runExample() throws Exception
+ {
+ Connection connection = null;
+ InitialContext initialContext = null;
+ try
+ {
+ // Step 1. Create an initial context to perform the JNDI lookup.
+ initialContext = getContext(0);
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ // Step 4. Create the JMS objects
+ connection = cf.createConnection();
+ // Step 2. Perfom a lookup on the queue
+ Queue queue = (Queue)initialContext.lookup("/queue/inputQueue");
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0 ; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ BytesMessage msg = session.createBytesMessage();
+ msg.setLongProperty("count", i);
+ msg.writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i + " messages");
+ session.commit();
+ }
+ }
+ session.commit();
+ Receiver rec1 = new Receiver("/queue/diverted1");
+ Receiver rec2 = new Receiver("/queue/diverted2");
+ Sender send = new Sender(new Receiver[]{rec1, rec2});
+ send.start();
+ rec1.start();
+ rec2.start();
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+ if (runServer)
+ {
+ while (timeEnd > System.currentTimeMillis())
+ {
+ System.out.println("Letting the service run for 20 seconds");
+ Thread.sleep(TimeUnit.SECONDS.toMillis(20));
+ stopServers();
+ Thread.sleep(10000);
+ boolean disconnected = false;
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 || rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the clients, please look at the logs");
+ break;
+ }
+ while (!disconnected)
+ {
+ disconnected = send.getConnection() == null && rec1.getConnection() == null && rec2.getConnection() == null;
+ if (!disconnected)
+ {
+ }
+ Thread.sleep(1000);
+ }
+ startServers();
+ }
+ }
+ else
+ {
+ while (timeEnd > System.currentTimeMillis())
+ {
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 || rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the clients, please look at the logs");
+ break;
+ }
+ Thread.sleep(10000);
+ }
+ }
+ send.setRunning(false);
+ rec1.setRunning(false);
+ rec2.setRunning(false);
+ send.join();
+ rec1.join();
+ rec2.join();
+ return send.getErrorsCount() == 0 && rec1.getErrorsCount() == 0 && rec2.getErrorsCount() == 0;
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -77,6 +77,11 @@
Element e = org.hornetq.utils.XMLUtil.stringToElement(xml);
FileConfigurationParser parser = new FileConfigurationParser();
+ // https://jira.jboss.org/browse/HORNETQ-478 - We only want to validate AIO when
+ // starting the server
+ // and we don't want to do it when deploying hornetq-queues.xml which uses the same parser and XML format
+ parser.setValidateAIO(true);
parser.parseMainConfig(e, this);
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -123,13 +123,31 @@
private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route";
// Attributes ----------------------------------------------------
+ private boolean validateAIO = false;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ /**
+ * @return the validateAIO
+ */
+ public boolean isValidateAIO()
+ {
+ return validateAIO;
+ }
+ /**
+ * @param validateAIO the validateAIO to set
+ */
+ public void setValidateAIO(boolean validateAIO)
+ {
+ this.validateAIO = validateAIO;
+ }
public Configuration parseMainConfig(final InputStream input) throws Exception
@@ -431,7 +449,10 @@
- log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO journal");
+ if (validateAIO)
+ {
+ log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO journal");
+ }
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -49,8 +49,6 @@
void forceMoveNextFile() throws Exception;
- void forceMoveNextFile(boolean synchronous) throws Exception;
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -279,8 +279,8 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
- public void writeInternal(ByteBuffer bytes) throws Exception
+ public void writeInternal(final ByteBuffer bytes) throws Exception
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
@@ -289,7 +289,6 @@
aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
// Protected methods
// -----------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -146,7 +146,8 @@
pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
- true, getThisClassLoader()));
+ true,
+ AIOSequentialFileFactory.getThisClassLoader()));
@@ -295,18 +296,17 @@
private static ClassLoader getThisClassLoader()
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -55,6 +55,8 @@
protected SequentialFile sequentialFile;
+ protected final JournalFilesRepository filesRepository;
protected long nextOrderingID;
private HornetQBuffer writingChannel;
@@ -69,11 +71,13 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long nextOrderingID)
this.journal = journal;
+ this.filesRepository = filesRepository;
this.fileFactory = fileFactory;
this.nextOrderingID = nextOrderingID;
@@ -150,16 +154,14 @@
new ByteArrayEncoding(filesToRename.toByteBuffer()
HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
@@ -180,10 +182,10 @@
if (writingChannel != null)
// To Fix the size of the file
@@ -212,14 +214,14 @@
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
- currentFile = journal.getFile(false, false, false, true);
+ currentFile = filesRepository.takeFile(false, false, false, true);
sequentialFile = currentFile.getFile();
sequentialFile.open(1, false);
currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -117,7 +117,8 @@
if (isSupportsCallbacks())
writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
- true, getThisClassLoader()));
+ true,
+ AbstractSequentialFileFactory.getThisClassLoader()));
@@ -193,14 +194,13 @@
private static ClassLoader getThisClassLoader()
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
Added: trunk/src/main/org/hornetq/core/journal/impl/CompactJournal.java
--- trunk/src/main/org/hornetq/core/journal/impl/CompactJournal.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,76 @@
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.journal.impl;
+ * This is an undocumented class, that will open a journal and force compacting on it.
+ * It may be used under special cases, but it shouldn't be needed under regular circumstances as the system should detect
+ * the need for compacting.
+ *
+ * The regular use is to configure min-compact parameters.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompactJournal
+ public static void main(final String arg[])
+ {
+ if (arg.length != 4)
+ {
+ System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.CompactJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>");
+ return;
+ }
+ try
+ {
+ CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ public static void compactJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize) throws Exception
+ {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+ journal.start();
+ journal.loadInternalOnly();
+ journal.compact();
+ journal.stop();
+ }
+ // Package protected ---------------------------------------------
+ // Protected -----------------------------------------------------
+ // Private -------------------------------------------------------
+ // Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
--- trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -19,8 +19,7 @@
import java.io.PrintStream;
import java.util.List;
-import org.hornetq.core.journal.*;
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.utils.Base64;
@@ -48,7 +47,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
if (arg.length != 5)
@@ -58,7 +57,7 @@
- exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
catch (Exception e)
@@ -67,32 +66,31 @@
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileOutput) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileOutput) throws Exception
FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
PrintStream out = new PrintStream(buffOut);
- exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+ ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- PrintStream out) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final PrintStream out) throws Exception
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
@@ -104,7 +102,7 @@
out.println("#File," + file);
- exportJournalFile(out, nio, file);
+ ExportJournal.exportJournalFile(out, nio, file);
@@ -114,67 +112,71 @@
* @param file
* @throws Exception
- public static void exportJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+ public static void exportJournalFile(final PrintStream out,
+ final SequentialFileFactory fileFactory,
+ final JournalFile file) throws Exception
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation@UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
- out.println("operation@Update," + describeRecord(recordInfo));
+ out.println("operation@Update," + ExportJournal.describeRecord(recordInfo));
- public void onReadRollbackRecord(long transactionID) throws Exception
+ public void onReadRollbackRecord(final long transactionID) throws Exception
out.println("operation@Rollback,txID@" + transactionID);
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
out.println("operation@Prepare,txID@" + transactionID +
",numberOfRecords@" +
numberOfRecords +
",extraData@" +
- encode(extraData));
+ ExportJournal.encode(extraData));
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- out.println("operation@DeleteRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation@DeleteRecordTX,txID@" + transactionID +
+ "," +
+ ExportJournal.describeRecord(recordInfo));
- public void onReadDeleteRecord(long recordID) throws Exception
+ public void onReadDeleteRecord(final long recordID) throws Exception
out.println("operation@DeleteRecord,id@" + recordID);
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation@AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
- out.println("operation@AddRecord," + describeRecord(recordInfo));
+ out.println("operation@AddRecord," + ExportJournal.describeRecord(recordInfo));
- public void markAsDataFile(JournalFile file)
+ public void markAsDataFile(final JournalFile file)
- private static String describeRecord(RecordInfo recordInfo)
+ private static String describeRecord(final RecordInfo recordInfo)
return "id@" + recordInfo.id +
",userRecordType@" +
@@ -186,10 +188,10 @@
",compactCount@" +
recordInfo.compactCount +
",data@" +
- encode(recordInfo.data);
+ ExportJournal.encode(recordInfo.data);
- private static String encode(byte[] data)
+ private static String encode(final byte[] data)
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalRecord;
import org.hornetq.utils.Base64;
@@ -52,7 +51,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
if (arg.length != 5)
@@ -62,7 +61,7 @@
- importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
catch (Exception e)
@@ -71,34 +70,35 @@
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileInput) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileInput) throws Exception
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- InputStream stream) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final InputStream stream) throws Exception
Reader reader = new InputStreamReader(stream);
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- Reader reader) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final Reader reader) throws Exception
File journalDir = new File(directory);
@@ -139,7 +139,7 @@
- Properties lineProperties = parseLine(splitLine);
+ Properties lineProperties = ImportJournal.parseLine(splitLine);
String operation = null;
@@ -148,67 +148,67 @@
if (operation.equals("AddRecord"))
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
else if (operation.equals("AddRecordTX"))
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
else if (operation.equals("AddRecordTX"))
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
else if (operation.equals("UpdateTX"))
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
else if (operation.equals("Update"))
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
else if (operation.equals("DeleteRecord"))
- long id = parseLong("id", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
journal.appendDeleteRecord(id, false);
else if (operation.equals("DeleteRecordTX"))
- long txID = parseLong("txID", lineProperties);
- long id = parseLong("id", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
journal.appendDeleteRecordTransactional(txID, id);
else if (operation.equals("Prepare"))
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
- byte[] data = parseEncoding("extraData", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+ byte[] data = ImportJournal.parseEncoding("extraData", lineProperties);
if (counter.get() == numberOfRecords)
@@ -227,9 +227,9 @@
else if (operation.equals("Commit"))
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
if (counter.get() == numberOfRecords)
journal.appendCommitRecord(txID, false);
@@ -247,7 +247,7 @@
else if (operation.equals("Rollback"))
- long txID = parseLong("txID", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
journal.appendRollbackRecord(txID, false);
@@ -264,7 +264,7 @@
- protected static AtomicInteger getCounter(Long txID, Map<Long, AtomicInteger> txCounters)
+ protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters)
AtomicInteger counter = txCounters.get(txID);
@@ -277,50 +277,50 @@
return counter;
- protected static RecordInfo parseRecord(Properties properties) throws Exception
+ protected static RecordInfo parseRecord(final Properties properties) throws Exception
- long id = parseLong("id", properties);
- byte userRecordType = parseByte("userRecordType", properties);
- boolean isUpdate = parseBoolean("isUpdate", properties);
- byte[] data = parseEncoding("data", properties);
+ long id = ImportJournal.parseLong("id", properties);
+ byte userRecordType = ImportJournal.parseByte("userRecordType", properties);
+ boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
+ byte[] data = ImportJournal.parseEncoding("data", properties);
return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
- private static byte[] parseEncoding(String name, Properties properties) throws Exception
+ private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
- return decode(value);
+ return ImportJournal.decode(value);
* @param properties
* @return
- private static int parseInt(String name, Properties properties) throws Exception
+ private static int parseInt(final String name, final Properties properties) throws Exception
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Integer.parseInt(value);
- private static long parseLong(String name, Properties properties) throws Exception
+ private static long parseLong(final String name, final Properties properties) throws Exception
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Long.parseLong(value);
- private static boolean parseBoolean(String name, Properties properties) throws Exception
+ private static boolean parseBoolean(final String name, final Properties properties) throws Exception
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Boolean.parseBoolean(value);
- private static byte parseByte(String name, Properties properties) throws Exception
+ private static byte parseByte(final String name, final Properties properties) throws Exception
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Byte.parseByte(value);
@@ -331,7 +331,7 @@
* @return
* @throws Exception
- private static String parseString(String name, Properties properties) throws Exception
+ private static String parseString(final String name, final Properties properties) throws Exception
String value = properties.getProperty(name);
@@ -342,7 +342,7 @@
return value;
- protected static Properties parseLine(String[] splitLine)
+ protected static Properties parseLine(final String[] splitLine)
Properties properties = new Properties();
@@ -362,7 +362,7 @@
return properties;
- private static byte[] decode(String data)
+ private static byte[] decode(final String data)
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -47,10 +47,10 @@
private static final Logger log = Logger.getLogger(JournalCompactor.class);
// We try to separate old record from new ones when doing the compacting
// this is a split line
- // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
+ // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
private final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
@@ -144,10 +144,11 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long firstFileID)
- super(fileFactory, journal, recordsSnapshot, firstFileID);
+ super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
/** This methods informs the Compactor about the existence of a pending (non committed) transaction */
@@ -216,7 +217,7 @@
checkSize(size, -1);
private void checkSize(final int size, final int compactCount) throws Exception
if (getWritingChannel() == null)
@@ -238,17 +239,19 @@
if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
int currentCount;
// This means we will need to split when the compactCount is bellow the watermark
boolean willNeedToSplit = false;
boolean splitted = false;
private boolean checkCompact(final int compactCount) throws Exception
@@ -257,7 +260,7 @@
willNeedToSplit = true;
if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
willNeedToSplit = false;
@@ -270,8 +273,6 @@
return false;
* Replay pending counts that happened during compacting
@@ -304,7 +305,7 @@
new ByteArrayEncoding(info.data));
addRecord.setCompactCount((short)(info.compactCount + 1));
checkSize(addRecord.getEncodeSize(), info.compactCount);
@@ -326,7 +327,7 @@
new ByteArrayEncoding(info.data));
record.setCompactCount((short)(info.compactCount + 1));
checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -346,15 +347,15 @@
- JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, transactionID, null);
writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
@@ -365,7 +366,8 @@
if (newRecords.get(recordID) != null)
// Sanity check, it should never happen
- throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID + ")");
+ throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID +
+ ")");
@@ -427,16 +429,16 @@
JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
@@ -450,7 +452,7 @@
new ByteArrayEncoding(info.data));
updateRecord.setCompactCount((short)(info.compactCount + 1));
checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -482,7 +484,7 @@
new ByteArrayEncoding(info.data));
updateRecordTX.setCompactCount((short)(info.compactCount + 1));
checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
@@ -531,7 +533,14 @@
void execute() throws Exception
JournalRecord deleteRecord = journal.getRecords().remove(id);
- deleteRecord.delete(usedFile);
+ if (deleteRecord == null)
+ {
+ JournalCompactor.log.warn("Can't find record " + id + " during compact replay");
+ }
+ else
+ {
+ deleteRecord.delete(usedFile);
+ }
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -45,7 +45,7 @@
void decSize(int bytes);
int getLiveSize();
/** The total number of deletes this file has */
int getTotalNegativeToOthers();
@@ -58,9 +58,9 @@
/** This is a field to identify that records on this file actually belong to the current file.
* The possible implementation for this is fileID & Integer.MAX_VALUE */
int getRecordID();
long getFileID();
int getJournalVersion();
SequentialFile getFile();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -47,7 +47,7 @@
private boolean canReclaim;
- private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+ private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
private final int version;
@@ -61,7 +61,7 @@
this.version = version;
- this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
+ recordID = (int)(fileID & Integer.MAX_VALUE);
public void clearCounts()
@@ -165,7 +165,7 @@
- return "JournalFileImpl: (" + file.getFileName() + " id = " + this.fileID + ", recordID = " + recordID + ")";
+ return "JournalFileImpl: (" + file.getFileName() + " id = " + fileID + ", recordID = " + recordID + ")";
catch (Exception e)
Added: trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,593 @@
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.journal.impl;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalFilesRepository
+ private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
+ private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging
+ // Journal
+ private static final void trace(final String message)
+ {
+ JournalFilesRepository.log.trace(message);
+ }
+ // Constants -----------------------------------------------------
+ // Attributes ----------------------------------------------------
+ private final SequentialFileFactory fileFactory;
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+ private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+ private final AtomicLong nextFileID = new AtomicLong(0);
+ private final int maxAIO;
+ private final int minFiles;
+ private final int fileSize;
+ private final String filePrefix;
+ private final String fileExtension;
+ private final int userVersion;
+ private Executor filesExecutor;
+ // Static --------------------------------------------------------
+ // Constructors --------------------------------------------------
+ public JournalFilesRepository(final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
+ {
+ this.fileFactory = fileFactory;
+ this.maxAIO = maxAIO;
+ this.filePrefix = filePrefix;
+ this.fileExtension = fileExtension;
+ this.minFiles = minFiles;
+ this.fileSize = fileSize;
+ this.userVersion = userVersion;
+ }
+ // Public --------------------------------------------------------
+ public void setExecutor(final Executor executor)
+ {
+ filesExecutor = executor;
+ }
+ public void clear()
+ {
+ dataFiles.clear();
+ drainClosedFiles();
+ freeFiles.clear();
+ for (JournalFile file : openedFiles)
+ {
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+ }
+ openedFiles.clear();
+ }
+ public int getMaxAIO()
+ {
+ return maxAIO;
+ }
+ public String getFileExtension()
+ {
+ return fileExtension;
+ }
+ public String getFilePrefix()
+ {
+ return filePrefix;
+ }
+ public void calculateNextfileID(final List<JournalFile> files)
+ {
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+ }
+ public void ensureMinFiles() throws Exception
+ {
+ // FIXME - size() involves a scan
+ int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+ if (filesToCreate > 0)
+ {
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ // Keeping all files opened can be very costly (mainly on AIO)
+ freeFiles.add(createFile(false, false, true, false));
+ }
+ }
+ }
+ public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+ {
+ if (multiAIO)
+ {
+ file.getFile().open();
+ }
+ else
+ {
+ file.getFile().open(1, false);
+ }
+ file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+ }
+ // Data File Operations ==========================================
+ public JournalFile[] getDataFilesArray()
+ {
+ return dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ }
+ public JournalFile pollLastDataFile()
+ {
+ return dataFiles.pollLast();
+ }
+ public void removeDataFile(final JournalFile file)
+ {
+ if (!dataFiles.remove(file))
+ {
+ JournalFilesRepository.log.warn("Could not remove file " + file + " from the list of data files");
+ }
+ }
+ public int getDataFilesCount()
+ {
+ return dataFiles.size();
+ }
+ public Collection<JournalFile> getDataFiles()
+ {
+ return dataFiles;
+ }
+ public void clearDataFiles()
+ {
+ dataFiles.clear();
+ }
+ public void addDataFileOnTop(final JournalFile file)
+ {
+ dataFiles.addFirst(file);
+ }
+ public void addDataFileOnBottom(final JournalFile file)
+ {
+ dataFiles.add(file);
+ }
+ // Free File Operations ==========================================
+ public int getFreeFilesCount()
+ {
+ return freeFiles.size();
+ }
+ /**
+ * Add directly to the freeFiles structure without reinitializing the file.
+ * used on load() only
+ */
+ public void addFreeFileNoInit(final JournalFile file)
+ {
+ freeFiles.add(file);
+ }
+ /**
+ * @param file
+ * @throws Exception
+ */
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+ {
+ if (file.getFile().size() != fileSize)
+ {
+ JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size");
+ file.getFile().delete();
+ }
+ else
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Adding free file " + file);
+ }
+ JournalFile jf = reinitializeFile(file);
+ if (renameTmp)
+ {
+ jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+ }
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().delete();
+ }
+ }
+ public Collection<JournalFile> getFreeFiles()
+ {
+ return freeFiles;
+ }
+ public JournalFile getFreeFile()
+ {
+ return freeFiles.remove();
+ }
+ // Opened files operations =======================================
+ public int getOpenedFilesCount()
+ {
+ return openedFiles.size();
+ }
+ public void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+ }
+ /**
+ * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+ * <p>In case there are no cached opened files, this method will block until the file was opened,
+ * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
+ * */
+ public JournalFile openFile() throws InterruptedException
+ {
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+ }
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.error(e.getMessage(), e);
+ }
+ }
+ };
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+ JournalFile nextFile = null;
+ while (nextFile == null)
+ {
+ nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+ if (nextFile == null)
+ {
+ JournalFilesRepository.log.warn("Couldn't open a file in 60 Seconds",
+ new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ }
+ }
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Returning file " + nextFile);
+ }
+ return nextFile;
+ }
+ /**
+ *
+ * Open a file and place it into the openedFiles queue
+ * */
+ public void pushOpenedFile() throws Exception
+ {
+ JournalFile nextOpenedFile = takeFile(true, true, true, false);
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("pushing openFile " + nextOpenedFile);
+ }
+ openedFiles.offer(nextOpenedFile);
+ }
+ public void closeFile(final JournalFile file)
+ {
+ fileFactory.deactivateBuffer();
+ pendingCloseFiles.add(file);
+ dataFiles.add(file);
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ drainClosedFiles();
+ }
+ };
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+ }
+ /**
+ * This will get a File from freeFile without initializing it
+ * @return
+ * @throws Exception
+ */
+ public JournalFile takeFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean initFile,
+ final boolean tmpCompactExtension) throws Exception
+ {
+ JournalFile nextFile = null;
+ nextFile = freeFiles.poll();
+ if (nextFile == null)
+ {
+ nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
+ if (tmpCompactExtension)
+ {
+ SequentialFile sequentialFile = nextFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ }
+ if (keepOpened)
+ {
+ openFile(nextFile, multiAIO);
+ }
+ }
+ return nextFile;
+ }
+ // Package protected ---------------------------------------------
+ // Protected -----------------------------------------------------
+ // Private -------------------------------------------------------
+ /**
+ * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+ * @param keepOpened
+ * @return
+ * @throws Exception
+ */
+ private JournalFile createFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean init,
+ final boolean tmpCompact) throws Exception
+ {
+ long fileID = generateFileID();
+ String fileName;
+ fileName = createFileName(tmpCompact, fileID);
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Creating file " + fileName);
+ }
+ String tmpFileName = fileName + ".tmp";
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+ sequentialFile.open(1, false);
+ if (init)
+ {
+ sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+ JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
+ }
+ long position = sequentialFile.position();
+ sequentialFile.close();
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
+ }
+ sequentialFile.renameTo(fileName);
+ if (keepOpened)
+ {
+ if (multiAIO)
+ {
+ sequentialFile.open();
+ }
+ else
+ {
+ sequentialFile.open(1, false);
+ }
+ sequentialFile.position(position);
+ }
+ return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+ }
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, final long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+ private long generateFileID()
+ {
+ return nextFileID.incrementAndGet();
+ }
+ /** Get the ID part of the name */
+ private long getFileNameID(final String fileName)
+ {
+ try
+ {
+ return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+ }
+ catch (Throwable e)
+ {
+ JournalFilesRepository.log.warn("Impossible to get the ID part of the file name " + fileName, e);
+ return 0;
+ }
+ }
+ // Discard the old JournalFile and set it with a new ID
+ private JournalFile reinitializeFile(final JournalFile file) throws Exception
+ {
+ long newFileID = generateFileID();
+ SequentialFile sf = file.getFile();
+ sf.open(1, false);
+ int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
+ JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+ sf.position(position);
+ sf.close();
+ return jf;
+ }
+ // Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -24,16 +24,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,7 +63,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
* <p>A circular log implementation.</p
@@ -91,23 +85,32 @@
private static final int STATE_LOADED = 2;
public static final int FORMAT_VERSION = 2;
- private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
+ private static final int COMPATIBLE_VERSIONS[] = new int[] { 1 };
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
+ private static final boolean trace = JournalImpl.log.isTraceEnabled();
+ // This is useful at debug time...
+ // if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
+ private static final boolean TRACE_RECORDS = false;
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
private static final void trace(final String message)
- log.trace(message);
+ JournalImpl.log.trace(message);
+ private static final void traceRecord(final String message)
+ {
+ JournalImpl.log.trace(message);
+ }
// The sizes of primitive types
public static final int MIN_FILE_SIZE = 1024;
@@ -167,12 +170,8 @@
private volatile boolean autoReclaim = true;
- private final AtomicLong nextFileID = new AtomicLong(0);
private final int userVersion;
- private final int maxAIO;
private final int fileSize;
private final int minFiles;
@@ -183,18 +182,8 @@
private final SequentialFileFactory fileFactory;
- public final String filePrefix;
+ private final JournalFilesRepository filesRepository;
- public final String fileExtension;
- private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
- private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
- private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
- private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -301,12 +290,14 @@
this.fileFactory = fileFactory;
- this.filePrefix = filePrefix;
+ filesRepository = new JournalFilesRepository(fileFactory,
+ filePrefix,
+ fileExtension,
+ userVersion,
+ maxAIO,
+ fileSize,
+ minFiles);
- this.fileExtension = fileExtension;
- this.maxAIO = maxAIO;
this.userVersion = userVersion;
@@ -390,19 +381,19 @@
* It won't be part of the interface as the tools should be specific to the implementation */
public List<JournalFile> orderFiles() throws Exception
- List<String> fileNames = fileFactory.listFiles(fileExtension);
+ List<String> fileNames = fileFactory.listFiles(filesRepository.getFileExtension());
List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
for (String fileName : fileNames)
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO());
file.open(1, false);
JournalFileImpl jrnFile = readFileHeader(file);
@@ -421,29 +412,6 @@
return orderedFiles;
- private void calculateNextfileID(List<JournalFile> files)
- {
- for (JournalFile file : files)
- {
- long fileID = file.getFileID();
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
- long fileNameID = getFileNameID(file.getFile().getFileName());
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
- }
- }
/** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
@@ -505,20 +473,20 @@
short compactCount = 0;
if (file.getJournalVersion() >= 2)
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
wholeFileBuffer.position(pos + 1);
compactCount = wholeFileBuffer.get();
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -714,19 +682,31 @@
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ false,
+ compactCount));
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ true,
+ compactCount));
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
+ (byte)0,
+ record,
+ true,
+ compactCount));
@@ -780,7 +760,7 @@
catch (Throwable e)
- log.warn(e.getMessage(), e);
+ JournalImpl.log.warn(e.getMessage(), e);
throw new Exception(e.getMessage(), e);
@@ -856,6 +836,15 @@
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
@@ -932,6 +921,15 @@
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (jrnRecord == null)
@@ -1008,6 +1006,11 @@
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+ }
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (record == null)
@@ -1060,6 +1063,17 @@
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
+ ",id=" +
+ id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
@@ -1104,6 +1118,17 @@
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
+ ",id=" +
+ id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
@@ -1142,6 +1167,15 @@
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecordTransactional::txID=" + txID +
+ ", id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
tx.addNegative(usedFile, id);
@@ -1231,6 +1265,11 @@
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
@@ -1304,6 +1343,11 @@
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
@@ -1433,9 +1477,9 @@
private void checkDeleteSize()
// HORNETQ-482 - Flush deletes only if memory is critical
- if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
+ if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2)
- log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ JournalImpl.log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
@@ -1451,7 +1495,7 @@
- log.debug("flush delete done");
+ JournalImpl.log.debug("flush delete done");
@@ -1514,18 +1558,22 @@
throw new IllegalStateException("There is pending compacting operation");
- ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+ ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(filesRepository.getDataFilesCount());
boolean previousReclaimValue = autoReclaim;
- if (trace)
+ if (JournalImpl.trace)
JournalImpl.trace("Starting compacting operation on journal");
- JournalImpl.log.debug("Starting compacting operation on journal");
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Starting compacting operation on journal");
+ }
// We need to guarantee that the journal is frozen for this short time
@@ -1543,22 +1591,26 @@
// We need to move to the next file, as we need a clear start for negatives and positives counts
- moveNextFile(true);
+ moveNextFile(false);
+ filesRepository.drainClosedFiles();
// Take the snapshots and replace the structures
- dataFilesToProcess.addAll(dataFiles);
+ dataFilesToProcess.addAll(filesRepository.getDataFiles());
- dataFiles.clear();
+ filesRepository.clearDataFiles();
- drainClosedFiles();
if (dataFilesToProcess.size() == 0)
- compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory,
+ this,
+ filesRepository,
+ records.keySet(),
+ dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
@@ -1590,7 +1642,7 @@
catch (Throwable e)
- log.warn("Error on reading compacting for " + file);
+ JournalImpl.log.warn("Error on reading compacting for " + file);
throw new Exception("Error on reading compacting for " + file, e);
@@ -1632,12 +1684,12 @@
JournalImpl.trace("Adding file " + fileToAdd + " back as datafile");
- dataFiles.addFirst(fileToAdd);
+ filesRepository.addDataFileOnTop(fileToAdd);
- if (trace)
+ if (JournalImpl.trace)
- JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+ JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
// Replay pending commands (including updates, deletes and commits)
@@ -1666,7 +1718,7 @@
- log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+ JournalImpl.log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
@@ -1679,11 +1731,16 @@
renameFiles(dataFilesToProcess, newDatafiles);
- if (trace)
+ if (JournalImpl.trace)
JournalImpl.log.debug("Finished compacting on journal");
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Finished compacting on journal");
+ }
@@ -1753,21 +1810,15 @@
- dataFiles.clear();
+ filesRepository.clear();
- pendingCloseFiles.clear();
- freeFiles.clear();
- openedFiles.clear();
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
- calculateNextfileID(orderedFiles);
+ filesRepository.calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -1817,7 +1868,8 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
+ posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+ // count
@@ -1862,12 +1914,13 @@
if (tnp == null)
- tnp = new JournalTransaction(info.id, JournalImpl.this);
+ tnp = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, tnp);
- tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
+ tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+ // count
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2030,43 +2083,23 @@
if (hasData.get())
lastDataPos = resultLastPost;
- dataFiles.add(file);
+ filesRepository.addDataFileOnBottom(file);
// Empty dataFiles with no data
- freeFiles.add(file);
+ filesRepository.addFreeFileNoInit(file);
// Create any more files we need
- // FIXME - size() involves a scan
- int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+ filesRepository.ensureMinFiles();
- if (filesToCreate > 0)
- {
- for (int i = 0; i < filesToCreate; i++)
- {
- // Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
- }
- }
+ // The current file is the last one that has data
- // The current file is the last one
+ currentFile = filesRepository.pollLastDataFile();
- Iterator<JournalFile> iter = dataFiles.iterator();
- while (iter.hasNext())
- {
- currentFile = iter.next();
- if (!iter.hasNext())
- {
- iter.remove();
- }
- }
if (currentFile != null)
@@ -2075,15 +2108,17 @@
- currentFile = freeFiles.remove();
+ currentFile = filesRepository.getFreeFile();
- openFile(currentFile, true);
+ filesRepository.openFile(currentFile, true);
- pushOpenedFile();
+ filesRepository.pushOpenedFile();
+ state = JournalImpl.STATE_LOADED;
for (TransactionHolder transaction : loadTransactions.values())
if (!transaction.prepared || transaction.invalid)
@@ -2091,19 +2126,9 @@
JournalImpl.log.warn("Uncommitted transaction with id " + transaction.transactionID +
" found and discarded");
- JournalTransaction transactionInfo = transactions.get(transaction.transactionID);
+ // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
+ this.appendRollbackRecord(transaction.transactionID, false);
- if (transactionInfo == null)
- {
- throw new IllegalStateException("Cannot find tx " + transaction.transactionID);
- }
- // Reverse the refs
- transactionInfo.forget();
- // Remove the transactionInfo
- transactions.remove(transaction.transactionID);
@@ -2128,8 +2153,6 @@
- state = JournalImpl.STATE_LOADED;
return new JournalLoadInformation(records.size(), maxID.longValue());
@@ -2152,7 +2175,7 @@
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
if (file.isCanReclaim())
@@ -2162,12 +2185,9 @@
JournalImpl.trace("Reclaiming file " + file);
- if (!dataFiles.remove(file))
- {
- JournalImpl.log.warn("Could not remove file " + file);
- }
+ filesRepository.removeDataFile(file);
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
@@ -2179,9 +2199,6 @@
return false;
- int deleteme = 0;
private boolean needsCompact() throws Exception
JournalFile[] dataFiles = getDataFiles();
@@ -2196,9 +2213,9 @@
long totalBytes = (long)dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
- boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+ boolean needCompact = totalLiveSize < compactMargin && dataFiles.length > compactMinFiles;
return needCompact;
@@ -2216,35 +2233,40 @@
- if (needsCompact())
+ if (!compactorRunning.get() && needsCompact())
- if (!compactorRunning.compareAndSet(false, true))
- {
- return;
- }
+ scheduleCompact();
+ }
+ }
- // We can't use the executor for the compacting... or we would dead lock because of file open and creation
- // operations (that will use the executor)
- compactorExecutor.execute(new Runnable()
+ private void scheduleCompact()
+ {
+ if (!compactorRunning.compareAndSet(false, true))
+ {
+ return;
+ }
+ // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+ // operations (that will use the executor)
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
- public void run()
- {
- try
- {
- JournalImpl.this.compact();
- }
- catch (Throwable e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- finally
- {
- compactorRunning.set(false);
- }
+ try
+ {
+ JournalImpl.this.compact();
- });
- }
+ catch (Throwable e)
+ {
+ JournalImpl.log.error(e.getMessage(), e);
+ }
+ finally
+ {
+ compactorRunning.set(false);
+ }
+ }
+ });
// TestableJournal implementation
@@ -2266,7 +2288,7 @@
StringBuilder builder = new StringBuilder();
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
builder.append("DataFile:" + file +
" posCounter = " +
@@ -2283,7 +2305,7 @@
- for (JournalFile file : freeFiles)
+ for (JournalFile file : filesRepository.getFreeFiles())
builder.append("FreeFile:" + file + "\n");
@@ -2302,8 +2324,6 @@
builder.append("CurrentFile: No current file at this point!");
- builder.append("#Opened Files:" + openedFiles.size());
return builder.toString();
@@ -2339,22 +2359,22 @@
public int getDataFilesCount()
- return dataFiles.size();
+ return filesRepository.getDataFilesCount();
public JournalFile[] getDataFiles()
- return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ return filesRepository.getDataFilesArray();
public int getFreeFilesCount()
- return freeFiles.size();
+ return filesRepository.getFreeFilesCount();
public int getOpenedFilesCount()
- return openedFiles.size();
+ return filesRepository.getOpenedFilesCount();
public int getIDMapSize()
@@ -2374,17 +2394,17 @@
public String getFilePrefix()
- return filePrefix;
+ return filesRepository.getFilePrefix();
public String getFileExtension()
- return fileExtension;
+ return filesRepository.getFileExtension();
public int getMaxAIO()
- return maxAIO;
+ return filesRepository.getMaxAIO();
public int getUserVersion()
@@ -2395,23 +2415,13 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
- forceMoveNextFile(true);
- }
- // In some tests we need to force the journal to move to a next file
- public void forceMoveNextFile(final boolean synchronous) throws Exception
- {
- moveNextFile(synchronous);
- if (autoReclaim && synchronous)
- {
- checkReclaimStatus();
- }
+ moveNextFile(false);
@@ -2448,7 +2458,7 @@
filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
return new Thread(r, "JournalImpl::FilesExecutor");
@@ -2457,12 +2467,14 @@
compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
return new Thread(r, "JournalImpl::CompactorExecutor");
+ filesRepository.setExecutor(filesExecutor);
state = JournalImpl.STATE_STARTED;
@@ -2493,6 +2505,8 @@
+ filesRepository.setExecutor(null);
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
JournalImpl.log.warn("Couldn't stop journal executor after 60 seconds");
@@ -2505,20 +2519,13 @@
- for (JournalFile file : openedFiles)
- {
- file.getFile().close();
- }
+ filesRepository.drainClosedFiles();
+ filesRepository.clear();
currentFile = null;
- dataFiles.clear();
- freeFiles.clear();
- openedFiles.clear();
@@ -2578,11 +2585,11 @@
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
catch (Throwable e)
- log.warn("Error reinitializing file " + file, e);
+ JournalImpl.log.warn("Error reinitializing file " + file, e);
@@ -2596,7 +2603,7 @@
for (JournalFile file : newFiles)
- String newName = renameExtensionFile(file.getFile().getFileName(), ".cmp");
+ String newName = JournalImpl.renameExtensionFile(file.getFile().getFileName(), ".cmp");
@@ -2606,7 +2613,7 @@
* @param name
* @return
- private String renameExtensionFile(String name, String extension)
+ protected static String renameExtensionFile(String name, final String extension)
name = name.substring(0, name.lastIndexOf(extension));
return name;
@@ -2632,63 +2639,6 @@
// -----------------------------------------------------------------------------
- * @param file
- * @throws Exception
- */
- private void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
- {
- if (file.getFile().size() != this.getFileSize())
- {
- log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
- file.getFile().delete();
- }
- else
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
- if (trace)
- {
- trace("Adding free file " + file);
- }
- JournalFile jf = reinitializeFile(file);
- if (renameTmp)
- {
- jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
- }
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().delete();
- }
- }
- // Discard the old JournalFile and set it with a new ID
- private JournalFile reinitializeFile(final JournalFile file) throws Exception
- {
- long newFileID = generateFileID();
- SequentialFile sf = file.getFile();
- sf.open(1, false);
- int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
- JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
- sf.position(position);
- sf.close();
- return jf;
- }
- /**
* <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
@@ -2779,29 +2729,30 @@
* @return
* @throws Exception
- private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
+ private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
int journalVersion = bb.getInt();
- if (journalVersion != FORMAT_VERSION)
+ if (journalVersion != JournalImpl.FORMAT_VERSION)
boolean isCompatible = false;
+ for (int v : JournalImpl.COMPATIBLE_VERSIONS)
if (v == journalVersion)
isCompatible = true;
if (!isCompatible)
- throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+ throw new HornetQException(HornetQException.IO_ERROR,
+ "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
@@ -2817,7 +2768,7 @@
bb = null;
return new JournalFileImpl(file, fileID, journalVersion);
@@ -2836,7 +2787,7 @@
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
- writeHeader(buffer, userVersion, fileID);
+ JournalImpl.writeHeader(buffer, userVersion, fileID);
@@ -2854,9 +2805,9 @@
* @param userVersion
* @param fileID
- public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
+ public static void writeHeader(final HornetQBuffer buffer, final int userVersion, final long fileID)
- buffer.writeInt(FORMAT_VERSION);
+ buffer.writeInt(JournalImpl.FORMAT_VERSION);
@@ -2890,7 +2841,7 @@
if (!currentFile.getFile().fits(size))
- moveNextFile(false);
+ moveNextFile(true);
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
@@ -2960,196 +2911,26 @@
return currentFile;
- /** Get the ID part of the name */
- private long getFileNameID(final String fileName)
+ // You need to guarantee lock.acquire() before calling this method
+ private void moveNextFile(final boolean scheduleReclaim) throws InterruptedException
- try
- {
- return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
- }
- catch (Throwable e)
- {
- JournalImpl.log.warn("Impossible to get the ID part of the file name " + fileName, e);
- return 0;
- }
- }
+ filesRepository.closeFile(currentFile);
- /**
- * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
- * @param keepOpened
- * @return
- * @throws Exception
- */
- private JournalFile createFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean init,
- final boolean tmpCompact) throws Exception
- {
- long fileID = generateFileID();
+ currentFile = filesRepository.openFile();
- String fileName;
- fileName = createFileName(tmpCompact, fileID);
- if (JournalImpl.trace)
+ if (scheduleReclaim)
- JournalImpl.trace("Creating file " + fileName);
+ scheduleReclaim();
- String tmpFileName = fileName + ".tmp";
- SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
- sequentialFile.open(1, false);
- if (init)
- {
- sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
- initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
- }
- long position = sequentialFile.position();
- sequentialFile.close();
if (JournalImpl.trace)
- JournalImpl.trace("Renaming file " + tmpFileName + " as " + fileName);
+ JournalImpl.trace("moveNextFile: " + currentFile);
- sequentialFile.renameTo(fileName);
- if (keepOpened)
- {
- if (multiAIO)
- {
- sequentialFile.open();
- }
- else
- {
- sequentialFile.open(1, false);
- }
- sequentialFile.position(position);
- }
- return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
- }
- /**
- * @param tmpCompact
- * @param fileID
- * @return
- */
- private String createFileName(final boolean tmpCompact, long fileID)
- {
- String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
- return fileName;
- }
- private void openFile(final JournalFile file, final boolean multiAIO) throws Exception
- {
- if (multiAIO)
- {
- file.getFile().open();
- }
- else
- {
- file.getFile().open(1, false);
- }
- file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
- }
- private long generateFileID()
- {
- return nextFileID.incrementAndGet();
- }
- // You need to guarantee lock.acquire() before calling this method
- private void moveNextFile(final boolean synchronous) throws InterruptedException
- {
- closeFile(currentFile, synchronous);
- currentFile = enqueueOpenFile(synchronous);
- if (JournalImpl.trace)
- {
- JournalImpl.trace("moveNextFile: " + currentFile + " sync: " + synchronous);
- }
- /**
- * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
- * <p>In case there are no cached opened files, this method will block until the file was opened,
- * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
- * */
- private JournalFile enqueueOpenFile(final boolean synchronous) throws InterruptedException
- {
- if (JournalImpl.trace)
- {
- JournalImpl.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
- }
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- }
- };
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
- if (!synchronous)
- {
- scheduleReclaim();
- }
- JournalFile nextFile = null;
- while (nextFile == null)
- {
- nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
- if (nextFile == null)
- {
- JournalImpl.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in 60 Seconds"));
- }
- }
- if (trace)
- {
- JournalImpl.trace("Returning file " + nextFile);
- }
- return nextFile;
- }
private void scheduleReclaim()
if (state != JournalImpl.STATE_LOADED)
@@ -3165,7 +2946,7 @@
- drainClosedFiles();
+ filesRepository.drainClosedFiles();
if (!checkReclaimStatus())
@@ -3180,97 +2961,6 @@
- /**
- *
- * Open a file and place it into the openedFiles queue
- * */
- private void pushOpenedFile() throws Exception
- {
- JournalFile nextOpenedFile = getFile(true, true, true, false);
- if (trace)
- {
- JournalImpl.trace("pushing openFile " + nextOpenedFile);
- }
- openedFiles.offer(nextOpenedFile);
- }
- /**
- * @return
- * @throws Exception
- */
- JournalFile getFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean initFile,
- final boolean tmpCompactExtension) throws Exception
- {
- JournalFile nextOpenedFile = null;
- nextOpenedFile = freeFiles.poll();
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
- }
- else
- {
- if (tmpCompactExtension)
- {
- SequentialFile sequentialFile = nextOpenedFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
- }
- if (keepOpened)
- {
- openFile(nextOpenedFile, multiAIO);
- }
- }
- return nextOpenedFile;
- }
- private void closeFile(final JournalFile file, final boolean synchronous)
- {
- fileFactory.deactivateBuffer();
- pendingCloseFiles.add(file);
- dataFiles.add(file);
- Runnable run = new Runnable()
- {
- public void run()
- {
- drainClosedFiles();
- }
- };
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
- }
- private void drainClosedFiles()
- {
- JournalFile file;
- try
- {
- while ((file = pendingCloseFiles.poll()) != null)
- {
- file.getFile().close();
- }
- }
- catch (Exception e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
- }
private JournalTransaction getTransactionInfo(final long txID)
JournalTransaction tx = transactions.get(txID);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -74,6 +74,7 @@
+ @Override
public String toString()
StringBuffer buffer = new StringBuffer();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -29,6 +29,6 @@
public interface JournalRecordProvider
JournalCompactor getCompactor();
Map<Long, JournalRecord> getRecords();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -61,10 +61,10 @@
this.id = id;
this.journal = journal;
- public void replaceRecordProvider(JournalRecordProvider provider)
+ public void replaceRecordProvider(final JournalRecordProvider provider)
- this.journal = provider;
+ journal = provider;
@@ -329,7 +329,7 @@
JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
if (posFiles != null)
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -260,14 +260,12 @@
internalWrite(bytes, sync, null);
- public void writeInternal(ByteBuffer bytes) throws Exception
+ public void writeInternal(final ByteBuffer bytes) throws Exception
internalWrite(bytes, true, null);
protected ByteBuffer newBuffer(int size, final int limit)
@@ -292,7 +290,7 @@
if (maxIOSemaphore == null)
Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -61,20 +61,24 @@
Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
for (int j = i; j < files.length; j++)
if (Reclaimer.trace)
if (files[j].getNegCount(currentFile) != 0)
- Reclaimer.trace("Negative from " + files[j] + " into " + currentFile + " = " + files[j].getNegCount(currentFile));
+ Reclaimer.trace("Negative from " + files[j] +
+ " into " +
+ currentFile +
+ " = " +
+ files[j].getNegCount(currentFile));
totNeg += files[j].getNegCount(currentFile);
if (posCount <= totNeg)
@@ -99,7 +103,7 @@
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -127,7 +127,6 @@
// Need to start with the spin limiter acquired
@@ -207,7 +206,7 @@
throw new IllegalStateException("TimedBuffer is not started");
if (sizeChecked > bufferSize)
throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
@@ -259,7 +258,7 @@
throw new IllegalStateException("TimedBuffer is not started");
delayFlush = false;
@@ -306,7 +305,7 @@
throw new IllegalStateException("TimedBuffer is not started");
if ((force || !delayFlush) && buffer.writerIndex() > 0)
int pos = buffer.writerIndex();
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -40,7 +40,7 @@
private final boolean isCommit;
- private final long txID;
+ public final long txID;
private final EncodingSupport transactionData;
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -25,7 +25,7 @@
public class JournalRollbackRecordTX extends JournalInternalRecord
- private final long txID;
+ public final long txID;
public JournalRollbackRecordTX(final long txID)
Modified: trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java
--- trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -14,6 +14,8 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
@@ -37,10 +39,15 @@
void setRecordID(long id);
long getTransactionID();
+ void store(StorageManager storageManager,Transaction tx) throws Exception;
+ void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws Exception;
- int increment();
+ // To be used after the update was stored or reload
+ void update(int update, StorageManager storageManager);
- int decrement();
+ void increment();
int getNumberOfMessages();
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -18,7 +18,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.utils.DataConstants;
@@ -30,11 +34,13 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageTransactionInfoImpl.class);
// Attributes ----------------------------------------------------
private long transactionID;
- private volatile long recordID;
+ private volatile long recordID = -1;
private volatile CountDownLatch countDownCompleted;
@@ -42,7 +48,7 @@
private volatile boolean rolledback;
- private final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private AtomicInteger numberOfMessages = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -75,21 +81,25 @@
return transactionID;
- public int increment()
+ public void update(final int update, final StorageManager storageManager)
- return numberOfMessages.incrementAndGet();
+ int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
+ if (sizeAfterUpdate == 0 && storageManager != null)
+ {
+ try
+ {
+ storageManager.deletePageTransactional(this.recordID);
+ }
+ catch (Exception e)
+ {
+ log.warn("Can't delete page transaction id=" + this.recordID);
+ }
+ }
- public int decrement()
+ public void increment()
- final int value = numberOfMessages.decrementAndGet();
- if (value < 0)
- {
- throw new IllegalStateException("Internal error Negative value on Paging transactions!");
- }
- return value;
+ numberOfMessages.incrementAndGet();
public int getNumberOfMessages()
@@ -103,10 +113,8 @@
transactionID = buffer.readLong();
- countDownCompleted = null; // if it is being readed, probably it was
- // committed
- committed = true; // Unless it is a incomplete prepare, which is marked by
- // markIcomplete
+ countDownCompleted = null;
+ committed = true;
public synchronized void encode(final HornetQBuffer buffer)
@@ -141,6 +149,49 @@
+ public void store(final StorageManager storageManager, final Transaction tx) throws Exception
+ {
+ storageManager.storePageTransaction(tx.getID(), this);
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
+ */
+ public void storeUpdate(final StorageManager storageManager, final Transaction tx, final int depages) throws Exception
+ {
+ storageManager.updatePageTransaction(tx.getID(), this, depages);
+ final PageTransactionInfo pgToUpdate = this;
+ tx.addOperation(new TransactionOperation()
+ {
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+ public void afterRollback(Transaction tx)
+ {
+ }
+ public void afterPrepare(Transaction tx)
+ {
+ }
+ public void afterCommit(Transaction tx)
+ {
+ pgToUpdate.update(depages, storageManager);
+ }
+ });
+ }
public boolean isCommit()
return committed;
@@ -166,6 +217,16 @@
countDownCompleted = new CountDownLatch(1);
+ public String toString()
+ {
+ return "PageTransactionInfoImpl(transactionID=" + transactionID +
+ ",id=" +
+ recordID +
+ ",numberOfMessages=" +
+ numberOfMessages +
+ ")";
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -14,8 +14,9 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -958,7 +959,7 @@
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
- HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+ HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
for (PagedMessage pagedMessage : pagedMessages)
@@ -978,6 +979,7 @@
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
PageTransactionInfo pageUserTransaction = null;
+ AtomicInteger countPageTX = null;
if (transactionIdDuringPaging >= 0)
@@ -992,6 +994,12 @@
+ countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
+ if (countPageTX == null)
+ {
+ countPageTX = new AtomicInteger();
+ pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
+ }
// This is to avoid a race condition where messages are depaged
// before the commit arrived
@@ -1036,8 +1044,7 @@
// This needs to be done after routing because of duplication detection
if (pageUserTransaction != null && message.isDurable())
- pageUserTransaction.decrement();
- pageTransactionsToUpdate.add(pageUserTransaction);
+ countPageTX.incrementAndGet();
@@ -1047,21 +1054,12 @@
return false;
- for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : pageTransactionsToUpdate.entrySet())
// This will set the journal transaction to commit;
- if (pageWithTransaction.getNumberOfMessages() == 0)
- {
- // numberOfReads==numberOfWrites -> We delete the record
- storageManager.deletePageTransactional(depageTransaction.getID(), pageWithTransaction.getRecordID());
- pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
- }
- else
- {
- storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
- }
+ entry.getKey().storeUpdate(storageManager, depageTransaction, entry.getValue().intValue());
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -134,8 +134,10 @@
void rollback(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
+ void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
- void deletePageTransactional(long txID, long recordID) throws Exception;
+ void deletePageTransactional(long recordID) throws Exception;
/** This method is only useful at the backup side. We only load internal structures making the journals ready for
* append mode on the backup side. */
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -575,13 +575,6 @@
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
- if (pageTransaction.getRecordID() != 0)
- {
- // Instead of updating the record, we delete the old one as that is
- // better for reclaiming
- messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
- }
@@ -590,6 +583,11 @@
+ public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+ }
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -623,9 +621,9 @@
messageJournal.appendDeleteRecord(id, true, getContext(true));
- public void deletePageTransactional(final long txID, final long recordID) throws Exception
+ public void deletePageTransactional(final long recordID) throws Exception
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ messageJournal.appendDeleteRecord(recordID, false);
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -907,14 +905,27 @@
- PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+ if (record.isUpdate)
+ {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+ pageUpdate.decode(buff);
+ PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
+ pageTX.update(pageUpdate.recods, null);
+ }
+ else
+ {
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+ pageTransactionInfo.decode(buff);
+ pageTransactionInfo.setRecordID(record.id);
+ pagingManager.addTransaction(pageTransactionInfo);
+ }
- pageTransactionInfo.decode(buff);
- pageTransactionInfo.setRecordID(record.id);
- pagingManager.addTransaction(pageTransactionInfo);
@@ -2006,7 +2017,49 @@
+ private static class PageUpdateTXEncoding implements EncodingSupport
+ {
+ public long pageTX;
+ public int recods;
+ public PageUpdateTXEncoding()
+ {
+ }
+ public PageUpdateTXEncoding(final long pageTX, final int records)
+ {
+ this.pageTX = pageTX;
+ this.recods = records;
+ }
+ public void decode(HornetQBuffer buffer)
+ {
+ this.pageTX = buffer.readLong();
+ this.recods = buffer.readInt();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(pageTX);
+ buffer.writeInt(recods);
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+ }
private static class ScheduledDeliveryEncoding extends QueueEncoding
long scheduledDeliveryTime;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -436,4 +436,18 @@
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ }
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -58,9 +58,9 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -614,11 +614,20 @@
Transaction tx = context.getTransaction();
boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
- if (!depage && message.storeIsPaging())
+ // if the TX paged at least one message on a give address, all the other addresses should also go towards paging cache now
+ boolean alreadyPaging = false;
+ if (tx.isPaging())
+ alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
+ }
+ if (!depage && message.storeIsPaging() || alreadyPaging)
+ {
+ tx.setPaging(true);
if (startedTx)
@@ -1106,12 +1115,20 @@
private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();
+ private final HashSet<SimpleString> addressesPaging = new HashSet<SimpleString>();
private Transaction subTX = null;
void addMessageToPage(final ServerMessage message)
+ addressesPaging.add(message.getAddress());
+ boolean isPaging(final SimpleString address)
+ {
+ return addressesPaging.contains(address);
+ }
public void afterCommit(final Transaction tx)
@@ -1229,7 +1246,15 @@
subTX = tx.copy();
route(message, subTX, false);
+ if (subTX.isContainsPersistent())
+ {
+ // The route wouldn't be able to update the persistent flag on the main TX
+ // If we don't do this we would eventually miss a commit record
+ tx.setContainsPersistent();
+ }
@@ -1244,7 +1269,7 @@
- storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ pageTransaction.store(storageManager, tx);
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -60,12 +60,20 @@
void removeOperation(TransactionOperation sync);
boolean hasTimedOut(long currentTime, int defaultTimeout);
+ /** We don't want to look on operations at every send, so we keep the paging attribute and will only look at
+ * the PagingOperation case this attribute is true*/
+ boolean isPaging();
+ void setPaging(boolean paging);
void putProperty(int index, Object property);
Object getProperty(int index);
void setContainsPersistent();
+ boolean isContainsPersistent();
void setTimeout(int timeout);
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -47,6 +47,8 @@
private final Xid xid;
private final long id;
+ private boolean paging = false;
private volatile State state = State.ACTIVE;
@@ -129,6 +131,11 @@
containsPersistent = true;
+ public boolean isContainsPersistent()
+ {
+ return containsPersistent;
+ }
public void setTimeout(final int timeout)
@@ -352,7 +359,17 @@
this.state = state;
+ public boolean isPaging()
+ {
+ return paging;
+ }
+ public void setPaging(boolean paging)
+ {
+ this.paging = paging;
+ }
public Xid getXid()
return xid;
Modified: trunk/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java
--- trunk/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -14,6 +14,7 @@
package org.hornetq.jms.persistence.config;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -112,16 +113,17 @@
public void encode(final HornetQBuffer buffer)
- buffer.writeString(name);
- buffer.writeNullableString(selector);
+ buffer.writeSimpleString(SimpleString.toSimpleString(name));
+ buffer.writeNullableSimpleString(SimpleString.toSimpleString(selector));
public void decode(final HornetQBuffer buffer)
type = PersistedType.getType(buffer.readByte());
- name = buffer.readString();
- selector = buffer.readNullableString();
+ name = buffer.readSimpleString().toString();
+ SimpleString selectorStr = buffer.readNullableSimpleString();
+ selector = (selectorStr == null) ? null : selectorStr.toString();
durable = buffer.readBoolean();
Modified: trunk/src/main/org/hornetq/utils/ReusableLatch.java
--- trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -87,6 +87,11 @@
int newState = actualState - numberOfReleases;
+ if (newState < 0)
+ {
+ newState = 0;
+ }
if (compareAndSetState(actualState, newState))
@@ -128,6 +133,12 @@
+ public void countDown(final int count)
+ {
+ control.releaseShared(count);
+ }
public void await() throws InterruptedException
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -14,8 +14,10 @@
package org.hornetq.tests.integration.client;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
@@ -30,6 +32,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
@@ -38,7 +41,6 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.DataConstants;
* A PagingTest
@@ -86,12 +88,192 @@
public void testSendReceivePagingNonPersistent() throws Exception
+ public void testWithDiverts() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ DivertConfiguration divert1 = new DivertConfiguration("dv1",
+ "nm1",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-1",
+ true,
+ null,
+ null);
+ DivertConfiguration divert2 = new DivertConfiguration("dv2",
+ "nm2",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-2",
+ true,
+ null,
+ null);
+ ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+ divertList.add(divert1);
+ divertList.add(divert2);
+ config.setDivertConfigurations(divertList);
+ server.start();
+ final int numberOfIntegers = 256;
+ final int numberOfMessages = 30000;
+ final byte[] body = new byte[numberOfIntegers * 4];
+ ByteBuffer bb = ByteBuffer.wrap(body);
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bb.putInt(j);
+ }
+ try
+ {
+ {
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS + "-1", null, true);
+ session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS + "-2", null, true);
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+ ClientMessage message = null;
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+ bodyLocal.writeBytes(body);
+ message.putIntProperty(new SimpleString("id"), i);
+ producer.send(message);
+ }
+ session.commit();
+ session.close();
+ server.stop();
+ }
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+ final ClientSessionFactory sf2 = createInVMFactory();
+ final AtomicInteger errors = new AtomicInteger(0);
+ Thread threads[] = new Thread[2];
+ for (int start = 1; start <= 2; start++)
+ {
+ final String addressToSubscribe = PagingTest.ADDRESS + "-" + start;
+ threads[start - 1] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSession session = sf2.createSession(null, null, false, true, true, false, 0);
+ ClientConsumer consumer = session.createConsumer(addressToSubscribe);
+ session.start();
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+ Assert.assertNotNull(message2);
+ Assert.assertEquals(i, message2.getIntProperty("id").intValue());
+ message2.acknowledge();
+ Assert.assertNotNull(message2);
+ session.commit();
+ try
+ {
+ assertBodiesEqual(body, message2.getBodyBuffer());
+ }
+ catch (AssertionFailedError e)
+ {
+ PagingTest.log.info("Expected buffer:" + UnitTestCase.dumbBytesHex(body, 40));
+ PagingTest.log.info("Arriving buffer:" + UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+ .toByteBuffer()
+ .array(), 40));
+ throw e;
+ }
+ }
+ consumer.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ }
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].start();
+ }
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].join();
+ }
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
@@ -150,11 +332,10 @@
if (persistentMessages)
server = createServer(true,
@@ -360,8 +541,7 @@
* - Make a destination in page mode
* - Add stuff to a transaction
@@ -396,7 +576,7 @@
byte[] body = new byte[messageSize];
ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
@@ -408,7 +588,7 @@
ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
firstMessage.putIntProperty(new SimpleString("id"), 0);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -479,7 +659,7 @@
Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
-// System.out.println(messageID);
+ // System.out.println(messageID);
Assert.assertEquals("message received out of order", i, messageID.intValue());
@@ -505,8 +685,133 @@
+ public void testDepageDuringTransaction3() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+ final int messageSize = 1024; // 1k
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+ byte[] body = new byte[messageSize];
+ ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
+ ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS);
+ ClientSession sessionNonTX = sf.createSession(true, true, 0);
+ sessionNonTX.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+ ClientProducer producerNonTransacted = sessionNonTX.createProducer(PagingTest.ADDRESS);
+ sessionNonTX.start();
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("Sending " + i);
+ ClientMessage message = sessionNonTX.createMessage(true);
+ message.getBodyBuffer().writeBytes(body);
+ message.putIntProperty(new SimpleString("id"), i);
+ producerTransacted.send(message);
+ if (i % 2 == 0)
+ {
+ System.out.println("Sending 20 msgs to make it page");
+ for (int j = 0 ; j < 20; j++)
+ {
+ ClientMessage msgSend = sessionNonTX.createMessage(true);
+ msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producerNonTransacted.send(msgSend);
+ }
+ assertTrue(server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+ }
+ else
+ {
+ System.out.println("Consuming 20 msgs to make it page");
+ ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+ for (int j = 0 ; j < 20; j++)
+ {
+ ClientMessage msgReceived = consumer.receive(10000);
+ assertNotNull(msgReceived);
+ msgReceived.acknowledge();
+ }
+ consumer.close();
+ }
+ }
+ ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+ while (true)
+ {
+ ClientMessage msgReceived = consumerNonTX.receive(1000);
+ if (msgReceived == null)
+ {
+ break;
+ }
+ msgReceived.acknowledge();
+ }
+ consumerNonTX.close();
+ ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+ Assert.assertNull(consumer.receiveImmediate());
+ sessionTransacted.commit();
+ sessionTransacted.close();
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage message = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+ Assert.assertNotNull(message);
+ Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
+ // System.out.println(messageID);
+ Assert.assertNotNull(messageID);
+ Assert.assertEquals("message received out of order", i, messageID.intValue());
+ System.out.println("MessageID = " + messageID);
+ message.acknowledge();
+ }
+ Assert.assertNull(consumer.receiveImmediate());
+ consumer.close();
+ sessionNonTX.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
public void testPageOnSchedulingNoRestart() throws Exception
@@ -756,8 +1061,6 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
for (int i = 0; i < numberOfMessages; i++)
@@ -955,7 +1258,7 @@
public void testDropMessagesExpiring() throws Exception
@@ -969,7 +1272,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
@@ -978,7 +1281,7 @@
ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession();
@@ -988,7 +1291,7 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
class MyHandler implements MessageHandler
int count;
@@ -1001,16 +1304,16 @@
catch (Exception e)
if (count % 1000 == 0)
log.info("received " + count);
@@ -1019,13 +1322,13 @@
- }
+ }
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
consumer.setMessageHandler(new MyHandler());
for (int i = 0; i < numberOfMessages; i++)
@@ -1034,12 +1337,12 @@
message = session.createMessage(false);
message.setExpiration(System.currentTimeMillis() + 100);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -53,9 +53,9 @@
private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
- private final String groupAddress = "";
+ private final String groupAddress = getUDPDiscoveryAddress();
- private final int groupPort = 8765;
+ private final int groupPort = getUDPDiscoveryPort();
private HornetQServer liveService;
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -80,8 +80,8 @@
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
- final String groupAddress = "";
- final int port = 7746;
+ final String groupAddress = getUDPDiscoveryAddress();
+ final int port = getUDPDiscoveryPort();
List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
connectorPairs.add(new Pair<String, String>(server1tc.getName(), null));
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -28,9 +28,9 @@
private static final Logger log = Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
- protected static final String groupAddress = "";
+ protected static final String groupAddress = getUDPDiscoveryAddress();
- protected static final int groupPort = 6745;
+ protected static final int groupPort = getUDPDiscoveryPort();
protected boolean isNetty()
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -35,9 +35,9 @@
private static final Logger log = Logger.getLogger(DiscoveryClusterWithBackupFailoverTest.class);
- protected static final String groupAddress = "";
+ protected static final String groupAddress = getUDPDiscoveryAddress();
- protected static final int groupPort = 6745;
+ protected static final int groupPort = getUDPDiscoveryPort();
protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
--- trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -53,16 +53,16 @@
private static final Logger log = Logger.getLogger(DiscoveryTest.class);
- private static final String address1 = "";
+ private static final String address1 = getUDPDiscoveryAddress();
- private static final String address2 = "";
+ private static final String address2 = getUDPDiscoveryAddress(1);
- private static final String address3 = "";
+ private static final String address3 = getUDPDiscoveryAddress(2);
public void testSimpleBroadcast() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
final String nodeID = RandomUtil.randomString();
@@ -122,7 +122,7 @@
public void testSimpleBroadcastSpecificNIC() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
final String nodeID = RandomUtil.randomString();
@@ -222,7 +222,7 @@
public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
final String nodeID = RandomUtil.randomString();
@@ -304,7 +304,7 @@
public void testIgnoreTrafficFromOwnNode() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -357,7 +357,7 @@
// public void testSimpleBroadcastDifferentAddress() throws Exception
// {
// final InetAddress groupAddress = InetAddress.getByName(address1);
- // final int groupPort = 6745;
+ // final int groupPort = getUDPDiscoveryPort();
// final int timeout = 500;
// BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
@@ -394,8 +394,8 @@
public void testSimpleBroadcastDifferentPort() throws Exception
- final InetAddress groupAddress = InetAddress.getByName("");
- final int groupPort = 6745;
+ final InetAddress groupAddress = InetAddress.getByName(getUDPDiscoveryAddress());
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
BroadcastGroup bg = new BroadcastGroupImpl(RandomUtil.randomString(),
@@ -417,7 +417,7 @@
- final int port2 = 6746;
+ final int port2 = getUDPDiscoveryPort(1);
DiscoveryGroup dg = new DiscoveryGroupImpl(RandomUtil.randomString(),
@@ -442,7 +442,7 @@
public void testSimpleBroadcastDifferentAddressAndPort() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
BroadcastGroup bg = new BroadcastGroupImpl(RandomUtil.randomString(),
@@ -465,7 +465,7 @@
final InetAddress groupAddress2 = InetAddress.getByName(DiscoveryTest.address2);
- final int port2 = 6746;
+ final int port2 = getUDPDiscoveryPort(1);
DiscoveryGroup dg = new DiscoveryGroupImpl(RandomUtil.randomString(),
@@ -490,13 +490,13 @@
public void testMultipleGroups() throws Exception
final InetAddress groupAddress1 = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort1 = 6745;
+ final int groupPort1 = getUDPDiscoveryPort();
final InetAddress groupAddress2 = InetAddress.getByName(DiscoveryTest.address2);
- final int groupPort2 = 6746;
+ final int groupPort2 = getUDPDiscoveryPort(1);
final InetAddress groupAddress3 = InetAddress.getByName(DiscoveryTest.address3);
- final int groupPort3 = 6747;
+ final int groupPort3 = getUDPDiscoveryPort(2);
final int timeout = 500;
@@ -624,7 +624,7 @@
public void testBroadcastNullBackup() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -677,7 +677,7 @@
public void testDiscoveryListenersCalled() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -745,7 +745,7 @@
public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String node1 = RandomUtil.randomString();
@@ -1014,7 +1014,7 @@
public void testMultipleDiscoveryGroups() throws Exception
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -1105,7 +1105,7 @@
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
DiscoveryGroup dg = new DiscoveryGroupImpl(RandomUtil.randomString(),
@@ -1144,7 +1144,7 @@
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
BroadcastGroup bg = new BroadcastGroupImpl(RandomUtil.randomString(),
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
--- trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -51,9 +51,9 @@
private static final Logger log = Logger.getLogger(HornetQConnectionFactoryTest.class);
- private final String groupAddress = "";
+ private final String groupAddress = getUDPDiscoveryAddress();
- private final int groupPort = 8765;
+ private final int groupPort = getUDPDiscoveryPort();
private HornetQServer liveService;
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -13,27 +13,36 @@
package org.hornetq.tests.integration.jms.server.management;
+import java.io.File;
+import javax.jms.Connection;
+import javax.jms.Message;
import javax.jms.Queue;
+import javax.jms.QueueRequestor;
+import javax.jms.QueueSession;
+import javax.jms.Session;
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ObjectNameBuilder;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.JMSManagementHelper;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.persistence.JMSStorageManager;
-import org.hornetq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
* A JMSServerControlRestartTest
@@ -51,17 +60,15 @@
protected InVMContext context;
- private HornetQServer server;
+ private JMSServerManager serverManager;
- private JMSServerManagerImpl serverManager;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- public void testCreateDurableQueueAndRestartServer() throws Exception
+ public void testCreateDurableQueueUsingJMXAndRestartServer() throws Exception
String queueName = RandomUtil.randomString();
String binding = RandomUtil.randomString();
@@ -79,10 +86,52 @@
+ checkNoBinding(context, binding);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ serverManager = createJMSServer();
+ serverManager.start();
+ o = UnitTestCase.checkBinding(context, binding);
+ Assert.assertTrue(o instanceof Queue);
+ queue = (Queue)o;
+ assertEquals(queueName, queue.getQueueName());
+ checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ }
+ public void testCreateDurableQueueUsingJMSAndRestartServer() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+ String binding = RandomUtil.randomString();
+ UnitTestCase.checkNoBinding(context, binding);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName());
+ Connection connection = HornetQJMSClient.createConnectionFactory(config).createConnection();
+ connection.start();
+ Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");
+ QueueSession session = (QueueSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueRequestor requestor = new QueueRequestor(session, managementQueue);
+ Message message = session.createMessage();
+ JMSManagementHelper.putOperationInvocation(message, "jms.server", "createQueue", queueName, binding);
+ Message reply = requestor.request(message);
+ assertTrue(JMSManagementHelper.hasOperationSucceeded(reply));
+ connection.close();
+ Object o = UnitTestCase.checkBinding(context, binding);
+ Assert.assertTrue(o instanceof Queue);
+ Queue queue = (Queue)o;
+ assertEquals(queueName, queue.getQueueName());
+ checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ serverManager.stop();
checkNoBinding(context, binding);
+ serverManager = createJMSServer();
o = UnitTestCase.checkBinding(context, binding);
@@ -101,35 +150,35 @@
+ serverManager = createJMSServer();
+ serverManager.start();
+ }
+ private JMSServerManager createJMSServer() throws Exception {
Configuration conf = new ConfigurationImpl();
+ conf.setPersistenceEnabled(true);
+ conf.setJournalType(JournalType.NIO);
conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ HornetQServer server = HornetQServers.newHornetQServer(conf, mbeanServer);
context = new InVMContext();
- JMSStorageManager storage = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(),
- server.getConfiguration(),
- server.getReplicationManager());
- serverManager = new JMSServerManagerImpl(server, null, storage);
+ serverManager = new JMSServerManagerImpl(server);
- serverManager.start();
- serverManager.activated();
+ return serverManager;
protected void tearDown() throws Exception
+ String bindingDir = serverManager.getHornetQServer().getConfiguration().getBindingsDirectory();
- server.stop();
serverManager = null;
+ System.out.println(bindingDir);
+ deleteDirectory(new File(bindingDir));
- server = null;
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -23,11 +23,13 @@
import junit.framework.Assert;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
+import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalCompactor;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalFileImpl;
@@ -38,7 +40,6 @@
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
@@ -53,7 +54,7 @@
private static final int NUMBER_OF_RECORDS = 1000;
- IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+ IDGenerator idGenerator = new SimpleIDGenerator(100000);
// General tests
// =============
@@ -232,6 +233,119 @@
+ public void testCompactPrepareRestart() throws Exception
+ {
+ setup(2, 60 * 1024, false);
+ createJournal();
+ startJournal();
+ load();
+ startCompact();
+ addTx(1, 2);
+ prepare(1, new SimpleEncoding(10, (byte)0));
+ finishCompact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ startCompact();
+ commit(1);
+ finishCompact();
+ journal.compact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactPrepareRestart2() throws Exception
+ {
+ setup(2, 60 * 1024, false);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, 2);
+ prepare(1, new SimpleEncoding(10, (byte)0));
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ startCompact();
+ commit(1);
+ finishCompact();
+ journal.compact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactPrepareRestart3() throws Exception
+ {
+ setup(2, 60 * 1024, false);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, 2, 3);
+ prepare(1, new SimpleEncoding(10, (byte)0));
+ startCompact();
+ commit(1);
+ finishCompact();
+ journal.compact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
public void testOnRollback() throws Exception
@@ -277,13 +391,13 @@
addTx(1, 5, 6, 7, 8);
@@ -806,7 +920,7 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
@@ -931,7 +1045,7 @@
// Delete part of the live records while cleanup still working
for (int i = 1; i < 5; i++)
@@ -939,7 +1053,7 @@
// Delete part of the live records after cleanup is done
for (int i = 5; i < 10; i++)
@@ -963,7 +1077,7 @@
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
@@ -978,7 +1092,7 @@
addTx(appendTX, appendOne);
addTx(appendTX, appendTwo);
@@ -1161,7 +1275,7 @@
public void testCompactFirstFileWithPendingCommits() throws Exception
setup(2, 60 * 1024, true);
@@ -1175,10 +1289,165 @@
addTx(tx, idGenerator.generateID());
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ if (i == 5)
+ {
+ commit(tx);
+ }
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+ journal.forceMoveNextFile();
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+ journal.forceMoveNextFile();
+ // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+ journal.compact();
+ journal.checkReclaimStatus();
+ journal.compact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactFirstFileWithPendingCommits3() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+ journal.forceMoveNextFile();
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+ journal.forceMoveNextFile();
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+ journal.forceMoveNextFile();
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out1.dmp");
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out2.dmp");
+ rollback(tx);
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out3.dmp");
+ journal.forceMoveNextFile();
+ journal.checkReclaimStatus();
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out4.dmp");
+ journal.compact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactFirstFileWithPendingCommits2() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+ journal.forceMoveNextFile();
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+ journal.forceMoveNextFile();
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+ journal.forceMoveNextFile();
+ startCompact();
+ System.out.println("Committing TX " + tx);
+ finishCompact();
+ journal.checkReclaimStatus();
+ journal.compact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactFirstFileWithPendingCommits4() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ long ids[] = new long[10];
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+ long tx1 = idGenerator.generateID();
+ journal.forceMoveNextFile();
ArrayList<Long> listToDelete = new ArrayList<Long>();
for (int i = 0; i < 10; i++)
@@ -1187,21 +1456,157 @@
for (Long id : listToDelete)
+ journal.forceMoveNextFile();
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+ journal.checkReclaimStatus();
+ journal.compact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactFirstFileWithPendingCommits5() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ long ids[] = new long[10];
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+ long tx1 = idGenerator.generateID();
- // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+ journal.forceMoveNextFile();
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+ journal.forceMoveNextFile();
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+ journal.checkReclaimStatus();
- journal.checkReclaimStatus();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactFirstFileWithPendingCommits6() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ long ids[] = new long[10];
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+ commit(tx0);
+ startCompact();
+ for (int i = 0 ; i < 10; i++)
+ {
+ delete(ids[i]);
+ }
+ finishCompact();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+ public void testCompactFirstFileWithPendingCommits7() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ long tx0 = idGenerator.generateID();
+ add(idGenerator.generateID());
+ long ids[] = new long[]{idGenerator.generateID(), idGenerator.generateID()};
+ addTx(tx0, ids[0]);
+ addTx(tx0, ids[1]);
+ journal.forceMoveNextFile();
+ commit(tx0);
+ journal.forceMoveNextFile();
+ delete(ids[0]);
+ delete(ids[1]);
+ journal.forceMoveNextFile();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -13,6 +13,10 @@
package org.hornetq.tests.integration.paging;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
@@ -58,7 +62,7 @@
AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(10 * 1024);
- defaultSetting.setMaxSizeBytes(100 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
@@ -155,6 +159,113 @@
+ public void testOrderOverTX() throws Exception
+ {
+ HornetQServer server = newHornetQServer();
+ server.start();
+ try
+ {
+ ClientSessionFactory sf;
+ if (isNetty())
+ {
+ sf = createNettyFactory();
+ }
+ else
+ {
+ sf = createInVMFactory();
+ }
+ ClientSession sessionConsumer = sf.createSession(true, true, 0);
+ sessionConsumer.createQueue(PagingSendTest.ADDRESS, PagingSendTest.ADDRESS, null, true);
+ final ClientSession sessionProducer = sf.createSession(false, false);
+ final ClientProducer producer = sessionProducer.createProducer(PagingSendTest.ADDRESS);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final int TOTAL_MESSAGES = 1000;
+ // Consumer will be ready after we have commits
+ final CountDownLatch ready = new CountDownLatch(1);
+ Thread tProducer = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ int commit = 0;
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+ if (i % 100 == 0 && i > 0)
+ {
+ sessionProducer.commit();
+ if (commit++ > 2)
+ {
+ ready.countDown();
+ }
+ }
+ }
+ sessionProducer.commit();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingSendTest.ADDRESS);
+ sessionConsumer.start();
+ tProducer.start();
+ assertTrue(ready.await(10, TimeUnit.SECONDS));
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+ Assert.assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ }
+ tProducer.join();
+ sessionConsumer.close();
+ sessionProducer.close();
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -119,7 +119,7 @@
- factory = new NIOSequentialFileFactory(dir.getPath());
+ factory = new NIOSequentialFileFactory(dir.getPath(), true);
maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
@@ -134,9 +134,6 @@
protected void onCompactLock() throws Exception
- // System.out.println("OnCompactLock");
- journal.forceMoveNextFile(false);
- // System.out.println("OnCompactLock done");
protected void onCompactStart() throws Exception
@@ -152,7 +149,7 @@
long id = idGen.generateID();
journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
- journal.forceMoveNextFile(false);
+ journal.forceMoveNextFile();
journal.appendDeleteRecord(id, id == 20);
// System.out.println("OnCompactStart leave");
@@ -283,6 +280,8 @@
+ journal.checkReclaimStatus();
assertEquals(0, journal.getDataFilesCount());
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
--- trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -116,7 +116,11 @@
if (i % 2 == 0 && i > 0)
System.out.println("DataFiles = " + journal.getDataFilesCount());
+ journal.debugWait();
+ journal.checkReclaimStatus();
if (journal.getDataFilesCount() != 0)
System.out.println("DebugJournal:" + journal.debug());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -463,8 +463,12 @@
AlignedJournalImplTest.log.debug("Expected exception " + e, e);
setupAndLoadJournal(JOURNAL_SIZE, 100);
+ journalImpl.forceMoveNextFile();
+ journalImpl.checkReclaimStatus();
Assert.assertEquals(0, records.size());
Assert.assertEquals(0, transactions.size());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -125,22 +125,6 @@
Assert.assertEquals(nr1, trans2.getNumberOfMessages());
- for (int i = 0; i < nr1; i++)
- {
- trans.decrement();
- }
- Assert.assertEquals(0, trans.getNumberOfMessages());
- try
- {
- trans.decrement();
- Assert.fail("Exception expected!");
- }
- catch (Throwable ignored)
- {
- }
public void testDoubleStart() throws Exception
@@ -1355,6 +1339,20 @@
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ }
class FakeStoreFactory implements PagingStoreFactory
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -321,6 +321,33 @@
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#isPaging()
+ */
+ public boolean isPaging()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#setPaging(boolean)
+ */
+ public void setPaging(boolean paging)
+ {
+ // TODO Auto-generated method stub
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
+ */
+ public boolean isContainsPersistent()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
class FakeMessage implements ServerMessage
Modified: trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
--- trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -30,6 +30,19 @@
private static final Logger log = Logger.getLogger(ReusableLatchTest.class);
+ public void testLatchWithParameterizedDown() throws Exception
+ {
+ ReusableLatch latch = new ReusableLatch(1000);
+ latch.countDown(5000);
+ assertTrue(latch.await(1000));
+ assertEquals(0, latch.getCount());
+ }
public void testLatchOnSingleThread() throws Exception
ReusableLatch latch = new ReusableLatch();
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-09-10 17:44:13 UTC (rev 9668)
@@ -96,8 +96,36 @@
private final String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/hornetq-unit-test";
// Static --------------------------------------------------------
+ protected static String getUDPDiscoveryAddress()
+ {
+ return System.getProperty("TEST-UDP-ADDRESS", "");
+ }
+ protected static String getUDPDiscoveryAddress(int variant)
+ {
+ String value = getUDPDiscoveryAddress();
+ int posPoint = value.lastIndexOf('.');
+ int last = Integer.valueOf( value.substring(posPoint + 1) );
+ return value.substring(0, posPoint + 1) + (last + variant);
+ }
+ public static int getUDPDiscoveryPort()
+ {
+ return Integer.parseInt(System.getProperty("TEST-UDP-PORT", "6750"));
+ }
+ public static int getUDPDiscoveryPort(final int variant)
+ {
+ return getUDPDiscoveryPort() + 1;
+ }
protected static JournalType getDefaultJournalType()
if (AsynchronousFileImpl.isLoaded())
14 years, 3 months
JBoss hornetq SVN: r9667 - trunk/src/main/org/hornetq/core/registry.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-10 11:23:04 -0400 (Fri, 10 Sep 2010)
New Revision: 9667
Fixing a few tests
Modified: trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java
--- trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java 2010-09-10 09:48:22 UTC (rev 9666)
+++ trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java 2010-09-10 15:23:04 UTC (rev 9667)
@@ -44,7 +44,14 @@
- return context.lookup(name);
+ if (context == null)
+ {
+ return null;
+ }
+ else
+ {
+ return context.lookup(name);
+ }
catch (NamingException e)
@@ -68,7 +75,10 @@
- context.unbind(name);
+ if (context != null)
+ {
+ context.unbind(name);
+ }
catch (NamingException e)
@@ -79,7 +89,10 @@
- context.close();
+ if (context != null)
+ {
+ context.close();
+ }
catch (NamingException e)
14 years, 3 months
JBoss hornetq SVN: r9666 - branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-10 05:48:22 -0400 (Fri, 10 Sep 2010)
New Revision: 9666
* use BufferHelper helper methods to read/write String as SimpleString
Modified: branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java
--- branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-10 02:21:18 UTC (rev 9665)
+++ branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-10 09:48:22 UTC (rev 9666)
@@ -14,7 +14,6 @@
package org.hornetq.jms.persistence.config;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -113,8 +112,8 @@
public void encode(final HornetQBuffer buffer)
- buffer.writeSimpleString(SimpleString.toSimpleString(name));
- buffer.writeNullableSimpleString(SimpleString.toSimpleString(selector));
+ BufferHelper.writeAsSimpleString(buffer, name);
+ BufferHelper.writeAsNullableSimpleString(buffer, selector);
@@ -122,8 +121,7 @@
type = PersistedType.getType(buffer.readByte());
name = buffer.readSimpleString().toString();
- SimpleString selectorStr = buffer.readNullableSimpleString();
- selector = (selectorStr == null) ? null : selectorStr.toString();
+ selector = BufferHelper.readNullableSimpleStringAsString(buffer);
durable = buffer.readBoolean();
14 years, 3 months
JBoss hornetq SVN: r9665 - branches/Branch_2_1.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-09 22:21:18 -0400 (Thu, 09 Sep 2010)
New Revision: 9665
Fixing examples for EAP
Modified: branches/Branch_2_1/build-hornetq.xml
--- branches/Branch_2_1/build-hornetq.xml 2010-09-10 00:55:58 UTC (rev 9664)
+++ branches/Branch_2_1/build-hornetq.xml 2010-09-10 02:21:18 UTC (rev 9665)
@@ -1282,17 +1282,194 @@
<copy todir="${build.dir}/eap-examples-tmp">
- <fileset dir="${examples.dir}" excludes="**/build.sh,**/build.bat, **/twitter-connector/**"/>
+ <fileset dir="${examples.dir}" excludes="**/run.bat,**/run.sh,**/twitter-connector/**"/>
<copy todir="${build.dir}/eap-examples-tmp" overwrite="true">
<fileset dir="examples-eap"/>
- <replace dir="${build.dir}/eap-examples-tmp" token="hornetq-ra.rar" value="jms-ra.rar"></replace>
+ <replace dir="${build.dir}/eap-examples-tmp" includes="**/*.java" token="hornetq-ra.rar" value="jms-ra.rar"></replace>
+ <replace dir="${build.dir}/eap-examples-tmp" includes="**/build.xml" token="property name="hornetq.run_script" value="false"" value="property name="hornetq.run_script" value="true""></replace>
+ <property name="build-sh" value="$ANT_HOME/bin/ant "$@""/>
+ <property name="build-bat" value="call ant.bat %*"/>
+ <!-- Fix-me, I couldn't find a better way to do this -->
+ <echo file="${build.dir}/eap-examples-tmp/javaee/xarecovery/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/jca-remote/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-tx-send/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-tx-not-supported/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/jms-bridge/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-setrollbackonly/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/ejb-jms-transaction/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-tx-required/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/servlet-transport/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-tx-local/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/jca-config/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/hajndi/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-message-selector/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/servlet-ssl/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-bmt/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/microcontainer/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/embedded-remote/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/perf/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/embedded/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-standalone/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic-hierarchies/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/client-kickoff/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue-requestor/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/jms-bridge/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-with-jta/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-priority/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/management-notifications/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/transactional/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/delayed-redelivery/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/consumer-rate-limit/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/management/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/large-message/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/transaction-failover/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/scheduled-message/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/bridge/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/durable-subscription/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue-selector/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/security/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/producer-rate-limit/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/reattach-node/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-heuristic/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/last-value-queue/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/instantiate-connection-factory/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/request-reply/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic-selector-example2/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/temp-queue/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/ssl-enabled/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/static-selector/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/interceptor/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/applet/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/pre-acknowledge/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/expiry/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/browser/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue-message-redistribution/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-queue/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic-selector-example1/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/client-side-load-balancing/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/perf/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-group/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/stomp/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/http-transport/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/no-consumer-buffering/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/send-acknowledgements/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/symmetric-cluster/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/application-layer-failover/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-topic/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/static-selector-jms/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/embedded/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-durable-subscription/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-receive/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/jmx/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/jaas/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/dead-letter/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/non-transaction-failover/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/stomp-websockets/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-send/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/divert/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/paging/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-group2/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-grouping/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-counters/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/soak/tx-restarts/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/common/build.sh" message="${build-sh}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/xarecovery/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/jca-remote/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-tx-send/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-tx-not-supported/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/jms-bridge/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-setrollbackonly/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/ejb-jms-transaction/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-tx-required/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/servlet-transport/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-cmt-tx-local/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/jca-config/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/hajndi/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-message-selector/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/servlet-ssl/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/javaee/mdb-bmt/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/microcontainer/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/embedded-remote/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/perf/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/core/embedded/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-standalone/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic-hierarchies/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/client-kickoff/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue-requestor/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/jms-bridge/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-with-jta/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-priority/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/management-notifications/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/transactional/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/delayed-redelivery/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/consumer-rate-limit/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/management/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/large-message/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/transaction-failover/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/scheduled-message/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/bridge/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/durable-subscription/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue-selector/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/security/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/producer-rate-limit/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/reattach-node/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-heuristic/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/last-value-queue/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/instantiate-connection-factory/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/request-reply/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic-selector-example2/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/temp-queue/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/ssl-enabled/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/static-selector/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/interceptor/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/applet/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/pre-acknowledge/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/expiry/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/browser/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/queue-message-redistribution/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-queue/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/topic-selector-example1/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/client-side-load-balancing/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/perf/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-group/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/stomp/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/http-transport/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/no-consumer-buffering/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/send-acknowledgements/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/symmetric-cluster/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/application-layer-failover/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-topic/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/static-selector-jms/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/embedded/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-durable-subscription/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-receive/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/jmx/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/jaas/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/dead-letter/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/non-transaction-failover/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/stomp-websockets/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/xa-send/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/divert/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/paging/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-group2/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/clustered-grouping/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/jms/message-counters/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/soak/tx-restarts/build.bat" message="${build-bat}"/>
+ <echo file="${build.dir}/eap-examples-tmp/common/build.bat" message="${build-bat}"/>
<zip destfile="${build.jars.dir}/${eap.examples.zip.name}">
- <zipfileset dir="${build.dir}/eap-examples-tmp" prefix="examples/hornetq"/>
+ <zipfileset dir="${build.dir}/eap-examples-tmp" prefix="hornetq-examples"/>
<delete dir="${build.dir}/eap-examples-tmp"/>
14 years, 3 months
JBoss hornetq SVN: r9664 - in trunk: src/main/org/hornetq/core/registry and 7 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-09 20:55:58 -0400 (Thu, 09 Sep 2010)
New Revision: 9664
Fixing ManualReconnectionToSingleServerTest as the context was not being reapplied to the new BindingRegistry introduced by Bill Burke.
Modified: trunk/.classpath
--- trunk/.classpath 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/.classpath 2010-09-10 00:55:58 UTC (rev 9664)
@@ -17,7 +17,7 @@
<classpathentry kind="src" path="examples/core/microcontainer/src"/>
<classpathentry kind="src" path="examples/core/embedded-remote/src"/>
<classpathentry kind="src" path="examples/core/perf/src"/>
- <classpathentry kind="src" path="examples/core/twitter-connector/src"/>
+ <classpathentry kind="src" path="examples/core/twitter-connector/src"/>
<classpathentry kind="src" path="examples/jms/applet/src"/>
<classpathentry kind="src" path="examples/jms/application-layer-failover/src"/>
<classpathentry kind="src" path="examples/jms/bridge/src"/>
@@ -126,5 +126,12 @@
<classpathentry kind="lib" path="thirdparty/wutka-dtdparser/lib/dtdparser121.jar"/>
<classpathentry kind="lib" path="thirdparty/org/jboss/ejb3/lib/jboss-ejb3-ext-api.jar"/>
<classpathentry kind="lib" path="thirdparty/org/twitter4j/lib/twitter4j-core.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/springframework/lib/spring-asm.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/springframework/lib/spring-beans.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/springframework/lib/spring-context.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/springframework/lib/spring-core.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/springframework/lib/spring-expression.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/springframework/lib/spring-jms.jar"/>
+ <classpathentry kind="lib" path="thirdparty/org/springframework/lib/spring-tx.jar"/>
<classpathentry kind="output" path="eclipse-output"/>
Modified: trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java
--- trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/src/main/org/hornetq/core/registry/JndiBindingRegistry.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -1,6 +1,6 @@
package org.hornetq.core.registry;
-import org.hornetq.spi.BindingRegistry;
+import org.hornetq.spi.core.naming.BindingRegistry;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -14,6 +14,22 @@
private Context context;
+ /**
+ * @return the context
+ */
+ public Object getContext()
+ {
+ return context;
+ }
+ /**
+ * @param context the context to set
+ */
+ public void setContext(Object context)
+ {
+ this.context = (Context)context;
+ }
public JndiBindingRegistry(Context context)
this.context = context;
Modified: trunk/src/main/org/hornetq/core/registry/MapBindingRegistry.java
--- trunk/src/main/org/hornetq/core/registry/MapBindingRegistry.java 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/src/main/org/hornetq/core/registry/MapBindingRegistry.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -1,15 +1,17 @@
package org.hornetq.core.registry;
-import org.hornetq.spi.BindingRegistry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.hornetq.spi.core.naming.BindingRegistry;
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
public class MapBindingRegistry implements BindingRegistry
- protected ConcurrentHashMap<String, Object> registry = new ConcurrentHashMap<String, Object>();
+ protected ConcurrentMap<String, Object> registry = new ConcurrentHashMap<String, Object>();
public Object lookup(String name)
@@ -29,4 +31,17 @@
public void close()
+ public Object getContext()
+ {
+ return registry;
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.naming.BindingRegistry#setContext(java.lang.Object)
+ */
+ public void setContext(Object ctx)
+ {
+ registry = (ConcurrentMap)ctx;
+ }
Modified: trunk/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java
--- trunk/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/src/main/org/hornetq/integration/spring/SpringBindingRegistry.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -1,6 +1,6 @@
package org.hornetq.integration.spring;
-import org.hornetq.spi.BindingRegistry;
+import org.hornetq.spi.core.naming.BindingRegistry;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@@ -34,4 +34,20 @@
public void close()
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.naming.BindingRegistry#getContext()
+ */
+ public Object getContext()
+ {
+ return this.factory;
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.naming.BindingRegistry#setContext(java.lang.Object)
+ */
+ public void setContext(Object ctx)
+ {
+ this.factory = (ConfigurableBeanFactory) ctx;
+ }
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -25,7 +25,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
-import org.hornetq.spi.BindingRegistry;
+import org.hornetq.spi.core.naming.BindingRegistry;
* The JMS Management interface.
Modified: trunk/src/main/org/hornetq/jms/server/embedded/EmbeddedJMS.java
--- trunk/src/main/org/hornetq/jms/server/embedded/EmbeddedJMS.java 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/src/main/org/hornetq/jms/server/embedded/EmbeddedJMS.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -5,7 +5,7 @@
import org.hornetq.core.server.embedded.EmbeddedHornetQ;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.spi.BindingRegistry;
+import org.hornetq.spi.core.naming.BindingRegistry;
import javax.naming.Context;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -66,7 +66,7 @@
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.management.JMSManagementService;
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
-import org.hornetq.spi.BindingRegistry;
+import org.hornetq.spi.core.naming.BindingRegistry;
import org.hornetq.utils.TimeAndCounterIDGenerator;
@@ -355,6 +355,10 @@
public synchronized void setContext(final Context context)
this.context = context;
+ if (registry != null && registry instanceof JndiBindingRegistry)
+ {
+ registry.setContext(context);
+ }
contextSet = true;
Deleted: trunk/src/main/org/hornetq/spi/BindingRegistry.java
--- trunk/src/main/org/hornetq/spi/BindingRegistry.java 2010-09-09 17:43:28 UTC (rev 9663)
+++ trunk/src/main/org/hornetq/spi/BindingRegistry.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -1,15 +0,0 @@
-package org.hornetq.spi;
- * Abstract interface for a registry to store endpoints like connection factories into.
- *
- * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
- * @version $Revision: 1 $
- */
-public interface BindingRegistry
- Object lookup(String name);
- boolean bind(String name, Object obj);
- void unbind(String name);
- void close();
Copied: trunk/src/main/org/hornetq/spi/core/naming/BindingRegistry.java (from rev 9663, trunk/src/main/org/hornetq/spi/BindingRegistry.java)
--- trunk/src/main/org/hornetq/spi/core/naming/BindingRegistry.java (rev 0)
+++ trunk/src/main/org/hornetq/spi/core/naming/BindingRegistry.java 2010-09-10 00:55:58 UTC (rev 9664)
@@ -0,0 +1,28 @@
+package org.hornetq.spi.core.naming;
+ * Abstract interface for a registry to store endpoints like connection factories into.
+ *
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public interface BindingRegistry
+ /** The context used by the registry.
+ * This may be used to setup the JNDI Context on the JNDI Registry.
+ * We keep it as an object here as the interface needs to be generic
+ * as this could be reused by others Registries (e.g set/get the Map on MapRegistry)
+ * @return
+ */
+ Object getContext();
+ void setContext(Object ctx);
+ Object lookup(String name);
+ boolean bind(String name, Object obj);
+ void unbind(String name);
+ void close();
14 years, 3 months
JBoss hornetq SVN: r9663 - branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-09 13:43:28 -0400 (Thu, 09 Sep 2010)
New Revision: 9663
HORNETQ-503 - Fix done by Jeff Mesnil (I'm just applying the patch and committing it)
Modified: branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java
--- branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-09 13:00:04 UTC (rev 9662)
+++ branches/Branch_2_1/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-09 17:43:28 UTC (rev 9663)
@@ -14,6 +14,7 @@
package org.hornetq.jms.persistence.config;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -112,16 +113,17 @@
public void encode(final HornetQBuffer buffer)
- buffer.writeString(name);
- buffer.writeNullableString(selector);
+ buffer.writeSimpleString(SimpleString.toSimpleString(name));
+ buffer.writeNullableSimpleString(SimpleString.toSimpleString(selector));
public void decode(final HornetQBuffer buffer)
type = PersistedType.getType(buffer.readByte());
- name = buffer.readString();
- selector = buffer.readNullableString();
+ name = buffer.readSimpleString().toString();
+ SimpleString selectorStr = buffer.readNullableSimpleString();
+ selector = (selectorStr == null) ? null : selectorStr.toString();
durable = buffer.readBoolean();
14 years, 3 months
JBoss hornetq SVN: r9662 - branches/Branch_2_1/tests/src/org/hornetq/tests/integration/jms/server/management.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-09 09:00:04 -0400 (Thu, 09 Sep 2010)
New Revision: 9662
* update test to use a really new JMS server for restart to ensure nothing is loaded from memory
* add test to use JMS management message to create dynamically the queue
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java 2010-09-09 06:56:01 UTC (rev 9661)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java 2010-09-09 13:00:04 UTC (rev 9662)
@@ -13,27 +13,36 @@
package org.hornetq.tests.integration.jms.server.management;
+import java.io.File;
+import javax.jms.Connection;
+import javax.jms.Message;
import javax.jms.Queue;
+import javax.jms.QueueRequestor;
+import javax.jms.QueueSession;
+import javax.jms.Session;
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ObjectNameBuilder;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.JMSManagementHelper;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.persistence.JMSStorageManager;
-import org.hornetq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
* A JMSServerControlRestartTest
@@ -51,17 +60,15 @@
protected InVMContext context;
- private HornetQServer server;
+ private JMSServerManager serverManager;
- private JMSServerManagerImpl serverManager;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- public void testCreateDurableQueueAndRestartServer() throws Exception
+ public void testCreateDurableQueueUsingJMXAndRestartServer() throws Exception
String queueName = RandomUtil.randomString();
String binding = RandomUtil.randomString();
@@ -79,10 +86,52 @@
+ checkNoBinding(context, binding);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ serverManager = createJMSServer();
+ serverManager.start();
+ o = UnitTestCase.checkBinding(context, binding);
+ Assert.assertTrue(o instanceof Queue);
+ queue = (Queue)o;
+ assertEquals(queueName, queue.getQueueName());
+ checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ }
+ public void testCreateDurableQueueUsingJMSAndRestartServer() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+ String binding = RandomUtil.randomString();
+ UnitTestCase.checkNoBinding(context, binding);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName());
+ Connection connection = HornetQJMSClient.createConnectionFactory(config).createConnection();
+ connection.start();
+ Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");
+ QueueSession session = (QueueSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueRequestor requestor = new QueueRequestor(session, managementQueue);
+ Message message = session.createMessage();
+ JMSManagementHelper.putOperationInvocation(message, "jms.server", "createQueue", queueName, binding);
+ Message reply = requestor.request(message);
+ assertTrue(JMSManagementHelper.hasOperationSucceeded(reply));
+ connection.close();
+ Object o = UnitTestCase.checkBinding(context, binding);
+ Assert.assertTrue(o instanceof Queue);
+ Queue queue = (Queue)o;
+ assertEquals(queueName, queue.getQueueName());
+ checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ serverManager.stop();
checkNoBinding(context, binding);
+ serverManager = createJMSServer();
o = UnitTestCase.checkBinding(context, binding);
@@ -101,35 +150,35 @@
+ serverManager = createJMSServer();
+ serverManager.start();
+ }
+ private JMSServerManager createJMSServer() throws Exception {
Configuration conf = new ConfigurationImpl();
+ conf.setPersistenceEnabled(true);
+ conf.setJournalType(JournalType.NIO);
conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ HornetQServer server = HornetQServers.newHornetQServer(conf, mbeanServer);
context = new InVMContext();
- JMSStorageManager storage = new JMSJournalStorageManagerImpl(new TimeAndCounterIDGenerator(),
- server.getConfiguration(),
- server.getReplicationManager());
- serverManager = new JMSServerManagerImpl(server, null, storage);
+ serverManager = new JMSServerManagerImpl(server);
- serverManager.start();
- serverManager.activated();
+ return serverManager;
protected void tearDown() throws Exception
+ String bindingDir = serverManager.getHornetQServer().getConfiguration().getBindingsDirectory();
- server.stop();
serverManager = null;
+ System.out.println(bindingDir);
+ deleteDirectory(new File(bindingDir));
- server = null;
14 years, 3 months
JBoss hornetq SVN: r9661 - branches/Branch_2_1.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-09 02:56:01 -0400 (Thu, 09 Sep 2010)
New Revision: 9661
EAP doesn't want the twitter example (not until 2.2 at least), removing it from the list
Modified: branches/Branch_2_1/build-hornetq.xml
--- branches/Branch_2_1/build-hornetq.xml 2010-09-09 06:48:25 UTC (rev 9660)
+++ branches/Branch_2_1/build-hornetq.xml 2010-09-09 06:56:01 UTC (rev 9661)
@@ -1282,7 +1282,7 @@
<copy todir="${build.dir}/eap-examples-tmp">
- <fileset dir="${examples.dir}" excludes="**/build.sh,**/build.bat"/>
+ <fileset dir="${examples.dir}" excludes="**/build.sh,**/build.bat, **/twitter-connector/**"/>
<copy todir="${build.dir}/eap-examples-tmp" overwrite="true">
14 years, 3 months
JBoss hornetq SVN: r9660 - branches/Branch_2_1.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-09 02:48:25 -0400 (Thu, 09 Sep 2010)
New Revision: 9660
fixing patch on twitter
Modified: branches/Branch_2_1/remove-twitter.patch
--- branches/Branch_2_1/remove-twitter.patch 2010-09-09 06:32:10 UTC (rev 9659)
+++ branches/Branch_2_1/remove-twitter.patch 2010-09-09 06:48:25 UTC (rev 9660)
@@ -1,8 +1,6 @@
-#This file is used to remove twitter support from HornetQ
-#As EAP doesn't have twitter4j.jar certified for the build
Index: src/main/org/hornetq/integration/twitter/TwitterConstants.java
---- src/main/org/hornetq/integration/twitter/TwitterConstants.java (revision 9563)
+--- src/main/org/hornetq/integration/twitter/TwitterConstants.java (revision 9658)
+++ src/main/org/hornetq/integration/twitter/TwitterConstants.java (working copy)
@@ -1,81 +0,0 @@
@@ -88,7 +86,7 @@
Index: src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
---- src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java (revision 9563)
+--- src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java (revision 9658)
+++ src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java (working copy)
@@ -1,202 +0,0 @@
@@ -295,7 +293,7 @@
Index: src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
---- src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java (revision 9563)
+--- src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java (revision 9658)
+++ src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java (working copy)
@@ -1,213 +0,0 @@
@@ -513,7 +511,7 @@
Index: src/main/org/hornetq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java
---- src/main/org/hornetq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java (revision 9563)
+--- src/main/org/hornetq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java (revision 9658)
+++ src/main/org/hornetq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java (working copy)
@@ -1,45 +0,0 @@
@@ -563,7 +561,7 @@
Index: src/main/org/hornetq/integration/twitter/TwitterIncomingConnectorServiceFactory.java
---- src/main/org/hornetq/integration/twitter/TwitterIncomingConnectorServiceFactory.java (revision 9563)
+--- src/main/org/hornetq/integration/twitter/TwitterIncomingConnectorServiceFactory.java (revision 9658)
+++ src/main/org/hornetq/integration/twitter/TwitterIncomingConnectorServiceFactory.java (working copy)
@@ -1,48 +0,0 @@
@@ -616,7 +614,7 @@
Index: pom.xml
---- pom.xml (revision 9563)
+--- pom.xml (revision 9658)
+++ pom.xml (working copy)
@@ -236,11 +236,6 @@
@@ -632,7 +630,7 @@
Index: tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
---- tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java (revision 9563)
+--- tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java (revision 9658)
+++ tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java (working copy)
@@ -1,511 +0,0 @@
@@ -1148,7 +1146,7 @@
Index: build-hornetq.xml
---- build-hornetq.xml (revision 9563)
+--- build-hornetq.xml (revision 9659)
+++ build-hornetq.xml (working copy)
@@ -65,8 +65,8 @@
<property name="jnp.client.jar.name" value="jnp-client.jar"/>
@@ -1166,11 +1164,11 @@
<property name="resources.jar.name" value="hornetq-resources.jar"/>
<property name="resources.sources.jar.name" value="hornetq-resources-sources.jar"/>
- <property name="twitter4j.jar.name" value="twitter4j-core.jar"/>
-+ <!-- <property name="twitter4j.jar.name" value="twitter4j-core.jar"/> -->
++ <!-- <property name="twitter4j.jar.name" value="twitter4j-core.jar"/> -->
+ <property name="eap.examples.zip.name" value="hornetq-eap-examples.zip"/>
<!--source and build dirs-->
- <property name="build.dir" value="build"/>
-@@ -94,7 +94,7 @@
+@@ -95,7 +95,7 @@
<property name="build.jms.classes.dir" value="${build.dir}/classes/jms"/>
<property name="build.jms.java5.classes.dir" value="${build.dir}/classes/jms-java5"/>
<property name="build.jboss.integration.classes.dir" value="${build.dir}/classes/jboss-integration"/>
@@ -1179,7 +1177,7 @@
<property name="build.service.classes.dir" value="${build.dir}/classes/service"/>
<property name="build.bootstrap.classes.dir" value="${build.dir}/classes/bootstrap"/>
<property name="build.logging.classes.dir" value="${build.dir}/classes/logging"/>
-@@ -213,10 +213,10 @@
+@@ -214,10 +214,10 @@
<path refid="org.jboss.javaee.classpath"/>
@@ -1192,7 +1190,7 @@
<path id="jboss.service.compilation.classpath">
<path refid="org.jboss.javaee.classpath"/>
-@@ -246,13 +246,13 @@
+@@ -247,13 +247,13 @@
<path refid="jboss.integration.compilation.classpath"/>
<path refid="bootstrap.compilation.classpath"/>
<path refid="junit.junit.classpath"/>
@@ -1208,7 +1206,7 @@
<path id="jms.test.compilation.classpath">
-@@ -305,7 +305,7 @@
+@@ -306,7 +306,7 @@
<!-- we must include Apache commons logging -->
<!-- as a transitive dependency from JBoss TM -->
<path refid="apache.logging.classpath"/>
@@ -1217,7 +1215,7 @@
<path id="emma.unit.test.execution.classpath">
-@@ -382,7 +382,7 @@
+@@ -383,7 +383,7 @@
<mkdir dir="${build.jms.classes.dir}"/>
<mkdir dir="${build.jms.java5.classes.dir}"/>
<mkdir dir="${build.jboss.integration.classes.dir}"/>
@@ -1226,7 +1224,7 @@
<mkdir dir="${build.service.classes.dir}"/>
<mkdir dir="${build.bootstrap.classes.dir}"/>
<mkdir dir="${build.logging.classes.dir}"/>
-@@ -560,7 +560,7 @@
+@@ -561,7 +561,7 @@
@@ -1235,7 +1233,7 @@
<javac destdir="${build.twitter.integration.classes.dir}"
-@@ -578,7 +578,7 @@
+@@ -579,7 +579,7 @@
<include name="org/hornetq/integration/twitter/**/*.java"/>
<classpath refid="twitter.integration.compilation.classpath"/>
@@ -1244,29 +1242,21 @@
<!-- author: Lucas Amador -->
<target name="compile-jboss-service" depends="compile-core">
-@@ -719,12 +719,19 @@
- <!-- Jar Targets -->
+@@ -721,11 +721,11 @@
<!-- ======================================================================================== -->
-+ <!-- <target name="sources-jar" description="create jar files containing source code"
-+ depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources">
-+ </target> -->
<target name="sources-jar" description="create jar files containing source code"
- depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources">
+ depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources">
-+ <!-- <target name="jar"
-+ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration">
-+ </target> -->
<target name="jar"
-- depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration">
-+ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar">
+- depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration, eap-examples">
++ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, eap-examples">
<target name="jar-jnp-client" depends="init">
-@@ -868,7 +875,7 @@
+@@ -869,7 +869,7 @@
@@ -1275,7 +1265,7 @@
<jar jarfile="${build.jars.dir}/${twitter.integration.jar.name}">
<fileset dir="${build.twitter.integration.classes.dir}" includes="**"/>
-@@ -882,7 +889,7 @@
+@@ -883,7 +883,7 @@
<include name="org/hornetq/integration/twitter/**/*.java"/>
@@ -1284,7 +1274,7 @@
<!-- author: Lucas Amador -->
<target name="jar-jboss-service" depends="compile-jboss-service">
-@@ -1148,7 +1155,7 @@
+@@ -1149,7 +1149,7 @@
<include name="${jms.client.jar.name}"/>
<include name="${jms.client.java5.jar.name}"/>
<include name="${jnp.client.jar.name}"/>
@@ -1293,7 +1283,7 @@
<fileset dir="${org.jboss.naming.lib}">
<include name="jnpserver.jar"/>
-@@ -1158,7 +1165,7 @@
+@@ -1159,7 +1159,7 @@
<copy file="${org.jboss.netty.lib}/${netty.jar.name}" tofile="${build.distro.lib.dir}/netty.jar"/>
@@ -1302,3 +1292,12 @@
<copy todir="${build.distro.config.dir}">
<fileset dir="${src.config.dir}">
<include name="*.xml"/>
+@@ -1563,8 +1563,6 @@
+ timeout="${junit.timeout}">
+ <sysproperty key="user.home" value="${user.home}"/>
+ <sysproperty key="java.io.tmpdir" value="${java.io.tmpdir}"/>
+- <sysproperty key="twitter.username" value="${twitter.username}"/>
+- <sysproperty key="twitter.password" value="${twitter.password}"/>
+ <jvmarg value="-Djava.library.path=native/bin"/>
+ <jvmarg value="-Dmodule.output=./"/>
+ <jvmarg value="-Djava.util.logging.config.file=src/config/trunk/non-clustered/logging.properties"/>
14 years, 3 months
JBoss hornetq SVN: r9659 - in branches/Branch_2_1: examples/common and 12 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-09 02:32:10 -0400 (Thu, 09 Sep 2010)
New Revision: 9659
Making the examples runnable with EAP (commit 1)
Modified: branches/Branch_2_1/build-hornetq.xml
--- branches/Branch_2_1/build-hornetq.xml 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/build-hornetq.xml 2010-09-09 06:32:10 UTC (rev 9659)
@@ -85,6 +85,7 @@
<property name="resources.jar.name" value="hornetq-resources.jar"/>
<property name="resources.sources.jar.name" value="hornetq-resources-sources.jar"/>
<property name="twitter4j.jar.name" value="twitter4j-core.jar"/>
+ <property name="eap.examples.zip.name" value="hornetq-eap-examples.zip"/>
<!--source and build dirs-->
<property name="build.dir" value="build"/>
@@ -724,7 +725,7 @@
<target name="jar"
- depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration">
+ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration, eap-examples">
<target name="jar-jnp-client" depends="init">
@@ -1275,8 +1276,30 @@
<gzip src="${build.dir}/${build.artifact}.tar"
+ <target name="eap-examples" description="Generates a file with examples tuned for the JBoss EAP" depends="init">
+ <mkdir dir="${build.dir}/eap-examples-tmp"/>
+ <copy todir="${build.dir}/eap-examples-tmp">
+ <fileset dir="${examples.dir}" excludes="**/build.sh,**/build.bat"/>
+ </copy>
+ <copy todir="${build.dir}/eap-examples-tmp" overwrite="true">
+ <fileset dir="examples-eap"/>
+ </copy>
+ <replace dir="${build.dir}/eap-examples-tmp" token="hornetq-ra.rar" value="jms-ra.rar"></replace>
+ <zip destfile="${build.jars.dir}/${eap.examples.zip.name}">
+ <zipfileset dir="${build.dir}/eap-examples-tmp" prefix="examples/hornetq"/>
+ </zip>
+ <delete dir="${build.dir}/eap-examples-tmp"/>
+ </target>
<target name="source-distro">
<mkdir dir="${build.dir}"/>
<zip destfile="${build.dir}/${build.artifact}-src.zip">
Modified: branches/Branch_2_1/examples/common/build.xml
--- branches/Branch_2_1/examples/common/build.xml 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/examples/common/build.xml 2010-09-09 06:32:10 UTC (rev 9659)
@@ -26,6 +26,7 @@
+ <property environment="ENV"/>
<dirname property="imported.basedir" file="${ant.file.example}"/>
<property file="${imported.basedir}/config/ant.properties"/>
@@ -60,21 +61,25 @@
<path id="compilation.classpath">
<fileset dir="${hornetq.jars.dir}">
- <include name="**/*.jar"/>
+ <include name="**/hornetq*.jar"/>
<fileset dir="${jars.dir}">
- <include name="**/jboss-jms-api.jar"/>
+ <include name="**/${jms-library}"/>
+ <include name="**/jboss-kernel.jar"/>
<path refid="extra.classpath"/>
<path id="client.compilation.classpath">
<fileset dir="${hornetq.jars.dir}">
+ <include name="**/jnpserver.jar"/>
<include name="**/*client*.jar"/>
- <include name="**/jboss-jms-api.jar"/>
+ <include name="**/${jms-library}"/>
+ <include name="**/netty.jar"/>
<fileset dir="${jars.dir}">
- <include name="**/jboss-jms-api.jar"/>
+ <include name="**/jnpserver.jar"/>
+ <include name="**/${jms-library}"/>
<include name="**/netty.jar"/>
<path refid="extra.classpath"/>
@@ -86,10 +91,12 @@
<pathelement location="${example.config.dir}"/>
<pathelement location="${classes.dir}"/>
<fileset dir="${hornetq.jars.dir}">
- <include name="**/*client*.jar"/>
+ <include name="**/hornetq*.jar"/>
<fileset dir="${jars.dir}">
- <include name="**/netty*.jar"/>
+ <include name="**/jboss-logging-spi.jar"/>
+ <include name="**/${jms-library}"/>
+ <include name="**/jboss-kernel.jar"/>
@@ -98,10 +105,21 @@
<pathelement location="${config.dir}"/>
<pathelement location="${example.config.dir}"/>
<pathelement location="${classes.dir}"/>
+ <fileset dir="${hornetq.jars.dir}">
+ <include name="**/jnpserver.jar"/>
+ <include name="**/netty*.jar"/>
+ </fileset>
<fileset dir="${jars.dir}">
- <include name="org/jboss/naming/lib/jnpserver.jar"/>
- <include name="org/jboss/netty/lib/netty*.jar"/>
- <include name="org/twitter4j/lib/twitter4j*.jar"/>
+ <include name="**/jboss-logging-spi.jar"/>
+ <include name="**/jnpserver.jar"/>
+ <include name="**/netty*.jar"/>
+ <include name="**/jboss-reflect.jar"/>
+ <include name="**/jboss-common-core.jar"/>
+ <include name="**/jboss-dependency.jar"/>
+ <include name="**/jboss-mdr.jar"/>
+ <include name="**/jbossxb.jar"/>
+ <include name="**/jboss-xml-binding.jar"/>
+ <include name="**/twitter4j*.jar"/>
@@ -137,7 +155,7 @@
<property name="serverclasspath" refid="server.classpath"/>
<property name="clientClasspath" refid="client.classpath"/>
- <!--<echo>client classpath = ${clientClasspath}</echo>-->
+ <!-- <echo>server classpath = ${serverclasspath}</echo> -->
<property file="${imported.basedir}/config/server.properties"/>
<java classname="${example.classname}" fork="true" resultproperty="example-result">
<jvmarg value="-Dhornetq.example.server.classpath=${serverclasspath}"/>
Modified: branches/Branch_2_1/examples/common/config/ant.properties
--- branches/Branch_2_1/examples/common/config/ant.properties 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/examples/common/config/ant.properties 2010-09-09 06:32:10 UTC (rev 9659)
@@ -2,3 +2,4 @@
Modified: branches/Branch_2_1/examples/core/embedded/build.xml
--- branches/Branch_2_1/examples/core/embedded/build.xml 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/examples/core/embedded/build.xml 2010-09-09 06:32:10 UTC (rev 9659)
@@ -23,6 +23,7 @@
<path id="local.classpath">
<fileset dir="${hornetq.jars.dir}">
<include name="hornetq-core.jar"/>
+ <include name="**/netty*jar"/>
<fileset dir="${jars.dir}">
<include name="**/netty*jar"/>
Modified: branches/Branch_2_1/examples/core/embedded-remote/build.xml
--- branches/Branch_2_1/examples/core/embedded-remote/build.xml 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/examples/core/embedded-remote/build.xml 2010-09-09 06:32:10 UTC (rev 9659)
@@ -22,6 +22,7 @@
<path id="remote.classpath">
<fileset dir="${hornetq.jars.dir}">
<include name="hornetq-core.jar"/>
+ <include name="**/netty*jar"/>
<fileset dir="${jars.dir}">
<include name="**/netty*jar"/>
Modified: branches/Branch_2_1/examples/core/microcontainer/build.xml
--- branches/Branch_2_1/examples/core/microcontainer/build.xml 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/examples/core/microcontainer/build.xml 2010-09-09 06:32:10 UTC (rev 9659)
@@ -25,16 +25,36 @@
<include name="hornetq-core.jar"/>
<include name="hornetq-bootstrap.jar"/>
<include name="hornetq-jbossas-security.jar"/>
+ <include name="**/netty*jar"/>
+ <include name="**/jboss-kernel.jar"/>
+ <include name="**/jboss-mc.jar"/>
+ <include name="**/jboss-logging-spi.jar"/>
+ <include name="**/jboss-reflect.jar"/>
+ <include name="**/jboss-common-core.jar"/>
+ <include name="**/jboss-dependency.jar"/>
+ <include name="**/jboss-mdr.jar"/>
+ <include name="**/jboss-xml-binding.jar"/>
- <fileset dir="${hornetq.jars.dir}">
- <include name="jboss-mc.jar"/>
- </fileset>
<fileset dir="${jars.dir}">
<include name="**/netty*jar"/>
+ <exclude name="**/*-sources.jar"/>
- <exclude name="**/*-sources.jar"/>
+ <include name="**/jboss-mc.jar"/>
+ <include name="**/hornetq-core.jar"/>
+ <include name="**/hornetq-bootstrap.jar"/>
+ <include name="**/hornetq-jbossas-security.jar"/>
+ <include name="**/jboss-mc.jar"/>
+ <include name="**/jboss-kernel.jar"/>
+ <include name="**/jboss-logging-spi.jar"/>
+ <include name="**/jboss-reflect.jar"/>
+ <include name="**/jboss-common-core.jar"/>
+ <include name="**/jboss-dependency.jar"/>
+ <include name="**/jboss-mdr.jar"/>
+ <include name="**/jboss-xml-binding.jar"/>
<!-- <include name="**/*.jar"/>
Modified: branches/Branch_2_1/examples/javaee/common/build.xml
--- branches/Branch_2_1/examples/javaee/common/build.xml 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/examples/javaee/common/build.xml 2010-09-09 06:32:10 UTC (rev 9659)
@@ -12,12 +12,12 @@
~ permissions and limitations under the License.
<project default="compile" name="javaeeexample" basedir=".">
+ <property environment="ENV"/>
<dirname property="imported.basedir" file="${ant.file.javaeeexample}"/>
<property file="${imported.basedir}/config/ant.properties"/>
<property name="example.config.dir" value="config"/>
<property file="${example.config.dir}/ant.properties"/>
<property name="example.server.dir" value="server"/>
- <property environment="ENV"/>
<property name="jboss.home" value="${ENV.JBOSS_HOME}"/>
@@ -56,7 +56,7 @@
<fileset dir="${jboss.home}/client">
<include name="**/*.jar"/>
- <fileset dir="${jboss.home}/server/default-with-hornetq/lib">
+ <fileset dir="${jboss.home}/server/${server.default}/lib">
<include name="netty*.jar"/>
@@ -116,7 +116,7 @@
<target name="deploy" depends="validate-jboss, copy-profile, copy-resources, ear">
<property name="example-profile" value="${jboss.home}/server/${example.name}-example-profile"/>
- <copy todir="${example-profile}/deploy/hornetq.sar" overwrite="true">
+ <copy todir="${example-profile}/deploy/${hornetq.config.dir}" overwrite="true">
<fileset dir="${example.server.dir}"/>
<copy todir="${example-profile}/deploy/" overwrite="true">
@@ -136,7 +136,7 @@
<target name="copy-profile" depends="profile.check" unless="donot.copy.profile">
- <property name="profile" value="${jboss.home}/server/default-with-hornetq"/>
+ <property name="profile" value="${jboss.home}/server/${server.default}"/>
<property name="example-profile" value="${jboss.home}/server/${example.name}-example-profile"/>
<property name="deploy.dir" value="${example-profile}/deploy/"/>
<mkdir dir="${example-profile}"/>
@@ -168,8 +168,8 @@
<target name="validate-jboss">
<fail unless="ENV.JBOSS_HOME" message="JBOSS_HOME environment variable not set! Set it and try again."/>
- <available property="default-config" type="dir" file="${jboss.home}/server/default-with-hornetq"/>
- <fail unless="default-config" message="${jboss.home}/server/default-with-hornetq not found!"/>
+ <available property="default-config" type="dir" file="${jboss.home}/server/${server.default}"/>
+ <fail unless="default-config" message="${jboss.home}/server/${server.default} not found!"/>
<target name="runExample" depends="validate-jboss,compile">
Modified: branches/Branch_2_1/examples/javaee/common/config/ant.properties
--- branches/Branch_2_1/examples/javaee/common/config/ant.properties 2010-09-08 22:11:16 UTC (rev 9658)
+++ branches/Branch_2_1/examples/javaee/common/config/ant.properties 2010-09-09 06:32:10 UTC (rev 9659)
@@ -1,4 +1,7 @@
\ No newline at end of file
Added: branches/Branch_2_1/examples-eap/common/config/ant.properties
--- branches/Branch_2_1/examples-eap/common/config/ant.properties (rev 0)
+++ branches/Branch_2_1/examples-eap/common/config/ant.properties 2010-09-09 06:32:10 UTC (rev 9659)
@@ -0,0 +1,5 @@
Added: branches/Branch_2_1/examples-eap/javaee/common/config/ant.properties
--- branches/Branch_2_1/examples-eap/javaee/common/config/ant.properties (rev 0)
+++ branches/Branch_2_1/examples-eap/javaee/common/config/ant.properties 2010-09-09 06:32:10 UTC (rev 9659)
@@ -0,0 +1,8 @@
Added: branches/Branch_2_1/examples-eap/readme
--- branches/Branch_2_1/examples-eap/readme (rev 0)
+++ branches/Branch_2_1/examples-eap/readme 2010-09-09 06:32:10 UTC (rev 9659)
@@ -0,0 +1 @@
+This directory contains files that will be used to replace files in order to make the examples runnable on the EAP
14 years, 3 months