Author: clebert.suconic(a)jboss.com
Date: 2010-09-10 13:44:13 -0400 (Fri, 10 Sep 2010)
New Revision: 9668
Added:
trunk/examples/soak/tx-restarts/
trunk/examples/soak/tx-restarts/README
trunk/examples/soak/tx-restarts/build.bat
trunk/examples/soak/tx-restarts/build.sh
trunk/examples/soak/tx-restarts/build.xml
trunk/examples/soak/tx-restarts/server0/
trunk/examples/soak/tx-restarts/server0/client-jndi.properties
trunk/examples/soak/tx-restarts/server0/hornetq-beans.xml
trunk/examples/soak/tx-restarts/server0/hornetq-configuration.xml
trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml
trunk/examples/soak/tx-restarts/server0/hornetq-users.xml
trunk/examples/soak/tx-restarts/server0/jndi.properties
trunk/examples/soak/tx-restarts/src/
trunk/examples/soak/tx-restarts/src/org/
trunk/examples/soak/tx-restarts/src/org/hornetq/
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
trunk/src/main/org/hornetq/core/journal/impl/CompactJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
Modified:
trunk/build-hornetq.xml
trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/src/main/org/hornetq/core/journal/TestableJournal.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java
trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java
trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java
trunk/src/main/org/hornetq/utils/ReusableLatch.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Merging fixes from Branch_2_1, r9591-r9608, r9612-r9659, r9660-9663
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-09-10 15:23:04 UTC (rev 9667)
+++ trunk/build-hornetq.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -87,6 +87,7 @@
<property name="resources.jar.name"
value="hornetq-resources.jar"/>
<property name="resources.sources.jar.name"
value="hornetq-resources-sources.jar"/>
<property name="twitter4j.jar.name"
value="twitter4j-core.jar"/>
+ <property name="eap.examples.zip.name"
value="hornetq-eap-examples.zip"/>
<!--source and build dirs-->
<property name="build.dir" value="build"/>
@@ -1326,8 +1327,30 @@
<gzip src="${build.dir}/${build.artifact}.tar"
destfile="${build.dir}/${build.artifact}.tar.gz"/>
</target>
+
+ <target name="eap-examples" description="Generates a file with
examples tuned for the JBoss EAP" depends="init">
+ <mkdir dir="${build.dir}/eap-examples-tmp"/>
+
+
+ <copy todir="${build.dir}/eap-examples-tmp">
+ <fileset dir="${examples.dir}"
excludes="**/build.sh,**/build.bat, **/twitter-connector/**"/>
+ </copy>
+
+ <copy todir="${build.dir}/eap-examples-tmp"
overwrite="true">
+ <fileset dir="examples-eap"/>
+ </copy>
+
+ <replace dir="${build.dir}/eap-examples-tmp"
token="hornetq-ra.rar" value="jms-ra.rar"></replace>
+
+ <zip destfile="${build.jars.dir}/${eap.examples.zip.name}">
+ <zipfileset dir="${build.dir}/eap-examples-tmp"
prefix="examples/hornetq"/>
+ </zip>
+
+ <delete dir="${build.dir}/eap-examples-tmp"/>
+ </target>
+
<target name="source-distro">
<mkdir dir="${build.dir}"/>
<zip destfile="${build.dir}/${build.artifact}-src.zip">
Modified: trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
===================================================================
--- trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -17,8 +17,6 @@
import java.util.Properties;
import java.util.logging.Logger;
-import javax.jms.Connection;
-import javax.jms.JMSException;
import javax.naming.InitialContext;
/**
@@ -32,26 +30,26 @@
private Process[] servers;
- private Connection conn;
+ protected boolean failure = false;
- private boolean failure = false;
+ protected String serverClasspath;
- private String serverClasspath;
+ protected String serverProps;
- private String serverProps;
-
public abstract boolean runExample() throws Exception;
private boolean logServerOutput;
- private String[] configs;
+ protected String[] configs;
+
+ protected boolean runServer;
protected void run(final String[] configs)
{
String runServerProp = System.getProperty("hornetq.example.runServer");
String logServerOutputProp =
System.getProperty("hornetq.example.logserveroutput");
serverClasspath =
System.getProperty("hornetq.example.server.classpath");
- boolean runServer = runServerProp == null ? true : Boolean.valueOf(runServerProp);
+ runServer = runServerProp == null ? true : Boolean.valueOf(runServerProp);
logServerOutput = logServerOutputProp == null ? false :
Boolean.valueOf(logServerOutputProp);
serverProps = System.getProperty("hornetq.example.server.args");
if (System.getProperty("hornetq.example.server.override.args") != null)
@@ -83,17 +81,6 @@
}
finally
{
- if (conn != null)
- {
- try
- {
- conn.close();
- }
- catch (JMSException e)
- {
- // ignore
- }
- }
if (runServer)
{
try
@@ -165,7 +152,7 @@
"hornetq-beans.xml");
}
- private void startServers() throws Exception
+ protected void startServers() throws Exception
{
servers = new Process[configs.length];
for (int i = 0; i < configs.length; i++)
@@ -174,7 +161,7 @@
}
}
- private void stopServers() throws Exception
+ protected void stopServers() throws Exception
{
for (Process server : servers)
{
@@ -185,7 +172,7 @@
}
}
- private void stopServer(final Process server) throws Exception
+ protected void stopServer(final Process server) throws Exception
{
if (!System.getProperty("os.name").contains("Windows")
&& !System.getProperty("os.name").contains("Mac OS X"))
{
Added: trunk/examples/soak/tx-restarts/README
===================================================================
--- trunk/examples/soak/tx-restarts/README (rev 0)
+++ trunk/examples/soak/tx-restarts/README 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,29 @@
+****************************************************
+* Soak Test For TX survival over restarts
+****************************************************
+
+Run The Test
+==============
+
+To run the test simply use ./build.sh
+
+It's important that you always clean the data directory before starting the test, as
it will validate for sequences generated.
+
+The test will start and stop a server multiple times.
+
+
+Run the server remotely
+=======================
+
+You can start the server directly if you want, you can just start the server as:
+
+./run.sh PATH_TO_HORNETQ/examples/soak/tx-restarts/server0
+
+
+Then you can run the test as:
+
+./build.sh runRemote
+
+
+And you can now kill and restart the server manually as many times as you want.
+
Added: trunk/examples/soak/tx-restarts/build.bat
===================================================================
--- trunk/examples/soak/tx-restarts/build.bat (rev 0)
+++ trunk/examples/soak/tx-restarts/build.bat 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: trunk/examples/soak/tx-restarts/build.sh
===================================================================
--- trunk/examples/soak/tx-restarts/build.sh (rev 0)
+++ trunk/examples/soak/tx-restarts/build.sh 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Added: trunk/examples/soak/tx-restarts/build.xml
===================================================================
--- trunk/examples/soak/tx-restarts/build.xml (rev 0)
+++ trunk/examples/soak/tx-restarts/build.xml 2010-09-10 17:44:13 UTC (rev 9668)
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+
+<project default="run" name="TX-Restarts soak test">
+
+ <import file="../../common/build.xml"/>
+ <property file="ant.properties"/>
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.TXRestartSoak"/>
+
+ <param name="java-min-memory" value="1G"/>
+ <param name="java-max-memory" value="1G"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname"
value="org.hornetq.jms.example.TXRestartSoak"/>
+ <param name="hornetq.example.runServer"
value="false"/>
+ </antcall>
+ </target>
+
+</project>
Added: trunk/examples/soak/tx-restarts/server0/client-jndi.properties
===================================================================
--- trunk/examples/soak/tx-restarts/server0/client-jndi.properties
(rev 0)
+++ trunk/examples/soak/tx-restarts/server0/client-jndi.properties 2010-09-10 17:44:13 UTC
(rev 9668)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/soak/tx-restarts/server0/hornetq-beans.xml
===================================================================
--- trunk/examples/soak/tx-restarts/server0/hornetq-beans.xml (rev
0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-beans.xml 2010-09-10 17:44:13 UTC (rev
9668)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer"
class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration"
class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager"
class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer"
class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager"
class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/soak/tx-restarts/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/soak/tx-restarts/server0/hornetq-configuration.xml
(rev 0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-configuration.xml 2010-09-10 17:44:13
UTC (rev 9668)
@@ -0,0 +1,65 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq
/schema/hornetq-configuration.xsd">
+
+
+ <journal-file-size>102400</journal-file-size>
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ </acceptors>
+
+ <address-settings>
+ <address-setting match="jms.queue.#">
+ <max-delivery-attempts>-1</max-delivery-attempts>
+ <!-- <max-size-bytes>335544320000</max-size-bytes> -->
+ <max-size-bytes>33554432</max-size-bytes>
+ <page-size-bytes>16777216</page-size-bytes>
+ <address-full-policy>PAGE</address-full-policy>
+ </address-setting>
+
+ </address-settings>
+
+
+ <diverts>
+ <divert name="div1">
+ <address>jms.queue.inputQueue</address>
+ <forwarding-address>jms.queue.diverted1</forwarding-address>
+ <exclusive>true</exclusive>
+ </divert>
+
+ <divert name="div2">
+ <address>jms.queue.inputQueue</address>
+ <forwarding-address>jms.queue.diverted2</forwarding-address>
+ <exclusive>true</exclusive>
+ </divert>
+ </diverts>
+
+
+
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.#">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createNonDurableQueue"
roles="guest"/>
+ <permission type="deleteNonDurableQueue"
roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml (rev
0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-jms.xml 2010-09-10 17:44:13 UTC (rev
9668)
@@ -0,0 +1,27 @@
+<configuration xmlns="urn:hornetq"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <min-large-message-size>100240</min-large-message-size>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="inputQueue">
+ <entry name="/queue/inputQueue"/>
+ </queue>
+
+ <queue name="diverted1">
+ <entry name="/queue/diverted1"/>
+ </queue>
+
+ <queue name="diverted2">
+ <entry name="/queue/diverted2"/>
+ </queue>
+</configuration>
Added: trunk/examples/soak/tx-restarts/server0/hornetq-users.xml
===================================================================
--- trunk/examples/soak/tx-restarts/server0/hornetq-users.xml (rev
0)
+++ trunk/examples/soak/tx-restarts/server0/hornetq-users.xml 2010-09-10 17:44:13 UTC (rev
9668)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: trunk/examples/soak/tx-restarts/server0/jndi.properties
===================================================================
--- trunk/examples/soak/tx-restarts/server0/jndi.properties (rev
0)
+++ trunk/examples/soak/tx-restarts/server0/jndi.properties 2010-09-10 17:44:13 UTC (rev
9668)
@@ -0,0 +1,2 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
\ No newline at end of file
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
===================================================================
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java
(rev 0)
+++
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose
of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public abstract class ClientAbstract extends Thread
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ClientAbstract.class.getName());
+
+ // Attributes ----------------------------------------------------
+
+ protected InitialContext ctx;
+
+ protected XAConnection conn;
+
+ protected XASession sess;
+
+ protected XAResource activeXAResource;
+
+ protected Xid activeXid;
+
+ protected volatile boolean running = true;
+
+ protected volatile int errors = 0;
+
+ /**
+ * A commit was called
+ * case we don't find the Xid, means it was accepted
+ */
+ protected volatile boolean pendingCommit = false;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected InitialContext getContext(final int serverId) throws Exception
+ {
+ String jndiFilename = "server" + serverId +
"/client-jndi.properties";
+ File jndiFile = new File(jndiFilename);
+ Properties props = new Properties();
+ FileInputStream inStream = null;
+ try
+ {
+ inStream = new FileInputStream(jndiFile);
+ props.load(inStream);
+ }
+ finally
+ {
+ if (inStream != null)
+ {
+ inStream.close();
+ }
+ }
+ return new InitialContext(props);
+
+ }
+
+ public XAConnection getConnection()
+ {
+ return conn;
+ }
+
+ public int getErrorsCount()
+ {
+ return errors;
+ }
+
+ public final void connect()
+ {
+ while (running)
+ {
+ try
+ {
+ disconnect();
+
+ ctx = getContext(0);
+
+ XAConnectionFactory cf =
(XAConnectionFactory)ctx.lookup("/ConnectionFactory");
+
+ conn = cf.createXAConnection();
+
+ sess = conn.createXASession();
+
+ activeXAResource = sess.getXAResource();
+
+ if (activeXid != null)
+ {
+ synchronized (ClientAbstract.class)
+ {
+ Xid[] xids = activeXAResource.recover(XAResource.TMSTARTRSCAN);
+ boolean found = false;
+ for (Xid recXid : xids)
+ {
+ if (recXid.equals(activeXid))
+ {
+ // System.out.println("Calling commit after a prepare on
" + this);
+ found = true;
+ callCommit();
+ }
+ }
+
+ if (!found)
+ {
+ if (pendingCommit)
+ {
+ System.out.println("Doing a commit based on a pending commit
on " + this);
+ onCommit();
+ }
+ else
+ {
+ System.out.println("Doing a rollback on " + this);
+ onRollback();
+ }
+
+ activeXid = null;
+ pendingCommit = false;
+ }
+ }
+ }
+
+ connectClients();
+
+ break;
+ }
+ catch (Exception e)
+ {
+ ClientAbstract.log.warning("Can't connect to server,
retrying");
+ disconnect();
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignored)
+ {
+ // if an interruption was sent, we will respect it and leave the loop
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ connect();
+ }
+
+ protected void callCommit() throws Exception
+ {
+ pendingCommit = true;
+ activeXAResource.commit(activeXid, false);
+ pendingCommit = false;
+ activeXid = null;
+ onCommit();
+ }
+
+ protected void callPrepare() throws Exception
+ {
+ activeXAResource.prepare(activeXid);
+ }
+
+ public void beginTX() throws Exception
+ {
+ activeXid = newXID();
+
+ activeXAResource.start(activeXid, XAResource.TMNOFLAGS);
+ }
+
+ public void endTX() throws Exception
+ {
+ activeXAResource.end(activeXid, XAResource.TMSUCCESS);
+ callPrepare();
+ callCommit();
+ }
+
+ public void setRunning(final boolean running)
+ {
+ this.running = running;
+ }
+
+ /**
+ * @return
+ */
+ private XidImpl newXID()
+ {
+ return new XidImpl("tst".getBytes(), 1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
+ protected abstract void connectClients() throws Exception;
+
+ protected abstract void onCommit();
+
+ protected abstract void onRollback();
+
+ public void disconnect()
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ try
+ {
+ if (ctx != null)
+ {
+ ctx.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+
+ ctx = null;
+ conn = null;
+ // it's not necessary to close the session as conn.close() will already take
care of that
+ sess = null;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
===================================================================
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java
(rev 0)
+++ trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+
+import org.hornetq.utils.ReusableLatch;
+
+/**
+ * A Receiver
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class Receiver extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private Queue queue;
+
+ // We should leave some messages on paging. We don't want to consume all for this
test
+ private final Semaphore minConsume = new Semaphore(0);
+
+ private final ReusableLatch latchMax = new ReusableLatch(0);
+
+ private static final int MAX_DIFF = 10000;
+
+ // The difference between producer and consuming
+ private final AtomicInteger currentDiff = new AtomicInteger(0);
+
+ private final String queueJNDI;
+
+ protected long msgs = 0;
+
+ protected int pendingMsgs = 0;
+
+ protected int pendingSemaphores = 0;
+
+ protected MessageConsumer cons;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public Receiver(String queueJNDI)
+ {
+ super();
+ this.queueJNDI = queueJNDI;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void run()
+ {
+ super.run();
+
+ while (running)
+ {
+ try
+ {
+ beginTX();
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ Message msg = cons.receive(5000);
+ if (msg == null)
+ {
+ break;
+ }
+
+ if (msg.getLongProperty("count") != msgs + pendingMsgs)
+ {
+ errors++;
+ System.out.println("count should be " + (msgs + pendingMsgs)
+ " when it was " + msg.getLongProperty("count") + " on " +
queueJNDI);
+ }
+
+ pendingMsgs++;
+ if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS))
+ {
+ break;
+ }
+
+ }
+
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#connectClients()
+ */
+ @Override
+ protected void connectClients() throws Exception
+ {
+
+ queue = (Queue)ctx.lookup(queueJNDI);
+
+ cons = sess.createConsumer(queue);
+
+ conn.start();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ msgs += pendingMsgs;
+ this.currentDiff.addAndGet(-pendingMsgs);
+ latchMax.countDown(pendingMsgs);
+ pendingMsgs = 0;
+ System.out.println("Commit on consumer " + queueJNDI + ",
msgs=" + msgs + " currentDiff = " + currentDiff);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ System.out.println("Rollback on consumer " + queueJNDI + ",
msgs=" + msgs);
+ minConsume.release(pendingMsgs);
+ pendingMsgs = 0;
+ }
+
+ public String toString()
+ {
+ return "Receiver::" + this.queueJNDI + ", msgs=" + msgs +
", pending=" + pendingMsgs;
+ }
+
+ /**
+ * @param pendingMsgs2
+ */
+ public void messageProduced(int producedMessages)
+ {
+ minConsume.release(producedMessages);
+ currentDiff.addAndGet(producedMessages);
+ System.out.println("Msg produced on " + this.queueJNDI + ",
currentDiff = " + currentDiff);
+ if (currentDiff.get() > MAX_DIFF)
+ {
+ System.out.println("Holding producer for 5 seconds");
+ latchMax.setCount(currentDiff.get() - MAX_DIFF);
+ try
+ {
+ latchMax.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
===================================================================
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java
(rev 0)
+++ trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/Sender.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.example;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+
+
+/**
+ * A Sender
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class Sender extends ClientAbstract
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected MessageProducer producer;
+ protected Queue queue;
+
+ protected long msgs = TXRestartSoak.MIN_MESSAGES_ON_QUEUE;
+ protected int pendingMsgs = 0;
+
+ protected final Receiver[] receivers;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public Sender(final Receiver[] receivers)
+ {
+ this.receivers = receivers;
+ }
+
+ @Override
+ protected void connectClients() throws Exception
+ {
+ queue = (Queue)ctx.lookup("/queue/inputQueue");
+ producer = sess.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+
+ public void run()
+ {
+ super.run();
+ while (running)
+ {
+ try
+ {
+ beginTX();
+ for (int i = 0 ; i < 1000; i++)
+ {
+ BytesMessage msg = sess.createBytesMessage();
+ msg.setLongProperty("count", pendingMsgs + msgs);
+ msg.writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+ pendingMsgs++;
+ }
+ endTX();
+ }
+ catch (Exception e)
+ {
+ connect();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onCommit()
+ */
+ @Override
+ protected void onCommit()
+ {
+ this.msgs += pendingMsgs;
+ for (Receiver rec : receivers)
+ {
+ rec.messageProduced(pendingMsgs);
+ }
+
+ pendingMsgs = 0;
+ System.out.println("commit on sender msgs = " + msgs );
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.jms.example.ClientAbstract#onRollback()
+ */
+ @Override
+ protected void onRollback()
+ {
+ pendingMsgs = 0;
+ System.out.println("Rolled back msgs=" + msgs);
+ }
+
+ public String toString()
+ {
+ return "Sender, msgs=" + msgs + ", pending=" + pendingMsgs;
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
===================================================================
--- trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java
(rev 0)
+++
trunk/examples/soak/tx-restarts/src/org/hornetq/jms/example/TXRestartSoak.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+
+/**
+ *
+ * This is used as a soak test to verify HornetQ's capability of persistent messages
over restarts.
+ *
+ * This is used as a smoke test before releases.
+ *
+ * WARNING: This is not a sample on how you should handle XA.
+ * You are supposed to use a TransactionManager.
+ * This class is doing the job of a TransactionManager that fits for the purpose
of this test only,
+ * however there are many more pitfalls to deal with Transactions.
+ *
+ * This is just to stress and soak test Transactions with HornetQ.
+ *
+ * And this is dealing with XA directly for the purpose testing only.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class TXRestartSoak extends HornetQExample
+{
+
+ public static final int MIN_MESSAGES_ON_QUEUE = 50000;
+
+ private static final Logger log = Logger.getLogger(TXRestartSoak.class.getName());
+
+ public static void main(final String[] args)
+ {
+ new TXRestartSoak().run(args);
+ }
+
+ private TXRestartSoak()
+ {
+ super();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.common.example.HornetQExample#runExample()
+ */
+ @Override
+ public boolean runExample() throws Exception
+ {
+
+ Connection connection = null;
+ InitialContext initialContext = null;
+
+ try
+ {
+ // Step 1. Create an initial context to perform the JNDI lookup.
+ initialContext = getContext(0);
+
+ ConnectionFactory cf =
(ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ // Step 4. Create the JMS objects
+ connection = cf.createConnection();
+
+ // Step 2. Perfom a lookup on the queue
+ Queue queue = (Queue)initialContext.lookup("/queue/inputQueue");
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ for (int i = 0 ; i < MIN_MESSAGES_ON_QUEUE; i++)
+ {
+ BytesMessage msg = session.createBytesMessage();
+ msg.setLongProperty("count", i);
+ msg.writeBytes(new byte[10 * 1024]);
+ producer.send(msg);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i + " messages");
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ Receiver rec1 = new Receiver("/queue/diverted1");
+ Receiver rec2 = new Receiver("/queue/diverted2");
+
+ Sender send = new Sender(new Receiver[]{rec1, rec2});
+
+ send.start();
+ rec1.start();
+ rec2.start();
+
+ long timeEnd = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+
+ if (runServer)
+ {
+ while (timeEnd > System.currentTimeMillis())
+ {
+ System.out.println("Letting the service run for 20 seconds");
+ Thread.sleep(TimeUnit.SECONDS.toMillis(20));
+ stopServers();
+
+ Thread.sleep(10000);
+
+ boolean disconnected = false;
+
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 ||
rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the
clients, please look at the logs");
+ break;
+ }
+
+ while (!disconnected)
+ {
+ disconnected = send.getConnection() == null &&
rec1.getConnection() == null && rec2.getConnection() == null;
+ if (!disconnected)
+ {
+ System.out.println("NOT ALL THE CLIENTS ARE DISCONNECTED, NEED
TO WAIT THEM");
+ }
+ Thread.sleep(1000);
+ }
+
+ startServers();
+ }
+ }
+ else
+ {
+ while (timeEnd > System.currentTimeMillis())
+ {
+ if (send.getErrorsCount() != 0 || rec1.getErrorsCount() != 0 ||
rec2.getErrorsCount() != 0)
+ {
+ System.out.println("There are sequence errors in some of the
clients, please look at the logs");
+ break;
+ }
+ Thread.sleep(10000);
+ }
+ }
+
+ send.setRunning(false);
+ rec1.setRunning(false);
+ rec2.setRunning(false);
+
+ send.join();
+ rec1.join();
+ rec2.join();
+
+ return send.getErrorsCount() == 0 && rec1.getErrorsCount() == 0
&& rec2.getErrorsCount() == 0;
+
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ }
+}
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -77,6 +77,11 @@
Element e = org.hornetq.utils.XMLUtil.stringToElement(xml);
FileConfigurationParser parser = new FileConfigurationParser();
+
+ //
https://jira.jboss.org/browse/HORNETQ-478 - We only want to validate AIO when
+ // starting the server
+ // and we don't want to do it when deploying hornetq-queues.xml which uses
the same parser and XML format
+ parser.setValidateAIO(true);
parser.parseMainConfig(e, this);
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -123,13 +123,31 @@
private static final String SEND_TO_DLA_ON_NO_ROUTE =
"send-to-dla-on-no-route";
// Attributes ----------------------------------------------------
+
+ private boolean validateAIO = false;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
+ /**
+ * @return the validateAIO
+ */
+ public boolean isValidateAIO()
+ {
+ return validateAIO;
+ }
+ /**
+ * @param validateAIO the validateAIO to set
+ */
+ public void setValidateAIO(boolean validateAIO)
+ {
+ this.validateAIO = validateAIO;
+ }
+
public Configuration parseMainConfig(final InputStream input) throws Exception
{
@@ -431,7 +449,10 @@
}
else
{
- log.warn("AIO wasn't located on this platform, it will fall back to
using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO
journal");
+ if (validateAIO)
+ {
+ log.warn("AIO wasn't located on this platform, it will fall back
to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO
journal");
+ }
config.setJournalType(JournalType.NIO);
}
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-09-10 15:23:04 UTC
(rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-09-10 17:44:13 UTC
(rev 9668)
@@ -49,8 +49,6 @@
void forceMoveNextFile() throws Exception;
- void forceMoveNextFile(boolean synchronous) throws Exception;
-
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -279,8 +279,8 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
-
- public void writeInternal(ByteBuffer bytes) throws Exception
+
+ public void writeInternal(final ByteBuffer bytes) throws Exception
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
@@ -289,7 +289,6 @@
aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
}
-
// Protected methods
//
-----------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -146,7 +146,8 @@
super.start();
pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
- true,
getThisClassLoader()));
+ true,
+
AIOSequentialFileFactory.getThisClassLoader()));
}
@@ -295,18 +296,17 @@
}
}
}
-
+
private static ClassLoader getThisClassLoader()
{
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return
ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
}
-
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -55,6 +55,8 @@
protected SequentialFile sequentialFile;
+ protected final JournalFilesRepository filesRepository;
+
protected long nextOrderingID;
private HornetQBuffer writingChannel;
@@ -69,11 +71,13 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long nextOrderingID)
{
super();
this.journal = journal;
+ this.filesRepository = filesRepository;
this.fileFactory = fileFactory;
this.nextOrderingID = nextOrderingID;
this.recordsSnapshot.addAll(recordsSnapshot);
@@ -150,16 +154,14 @@
new
ByteArrayEncoding(filesToRename.toByteBuffer()
.array()));
-
-
HornetQBuffer renameBuffer =
HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
controlRecord.setFileID(0);
-
+
controlRecord.encode(renameBuffer);
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-
+
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0,
renameBuffer.writerIndex());
writeBuffer.rewind();
@@ -180,10 +182,10 @@
if (writingChannel != null)
{
sequentialFile.position(0);
-
+
// To Fix the size of the file
writingChannel.writerIndex(writingChannel.capacity());
-
+
sequentialFile.writeInternal(writingChannel.toByteBuffer());
sequentialFile.close();
newDataFiles.add(currentFile);
@@ -212,14 +214,14 @@
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
- currentFile = journal.getFile(false, false, false, true);
-
+ currentFile = filesRepository.takeFile(false, false, false, true);
+
sequentialFile = currentFile.getFile();
sequentialFile.open(1, false);
currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++,
JournalImpl.FORMAT_VERSION);
-
+
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(),
currentFile.getFileID());
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
---
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -117,7 +117,8 @@
if (isSupportsCallbacks())
{
writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" +
System.identityHashCode(this),
- true,
getThisClassLoader()));
+
true,
+
AbstractSequentialFileFactory.getThisClassLoader()));
}
}
@@ -193,14 +194,13 @@
private static ClassLoader getThisClassLoader()
{
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return
ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
}
-
}
Added: trunk/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/CompactJournal.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+/**
+ * This is an undocumented class, that will open a journal and force compacting on it.
+ * It may be used under special cases, but it shouldn't be needed under regular
circumstances as the system should detect
+ * the need for compacting.
+ *
+ * The regular use is to configure min-compact parameters.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class CompactJournal
+{
+
+ public static void main(final String arg[])
+ {
+ if (arg.length != 4)
+ {
+ System.err.println("Use: java -cp hornetq-core.jar
org.hornetq.core.journal.impl.CompactJournal <JournalDirectory>
<JournalPrefix> <FileExtension> <FileSize>");
+ return;
+ }
+
+ try
+ {
+ CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2,
Integer.parseInt(arg[3]));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void compactJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize) throws Exception
+ {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix,
journalSuffix, 1);
+
+ journal.start();
+
+ journal.loadInternalOnly();
+
+ journal.compact();
+
+ journal.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -19,8 +19,7 @@
import java.io.PrintStream;
import java.util.List;
-import org.hornetq.core.journal.*;
-
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.utils.Base64;
@@ -48,7 +47,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
{
if (arg.length != 5)
{
@@ -58,7 +57,7 @@
try
{
- exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]),
arg[4]);
}
catch (Exception e)
{
@@ -67,32 +66,31 @@
}
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileOutput) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileOutput) throws Exception
{
-
+
FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
PrintStream out = new PrintStream(buffOut);
-
- exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
-
+
+ ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles,
fileSize, out);
+
out.close();
}
-
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- PrintStream out) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final PrintStream out) throws Exception
{
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
@@ -104,7 +102,7 @@
{
out.println("#File," + file);
- exportJournalFile(out, nio, file);
+ ExportJournal.exportJournalFile(out, nio, file);
}
}
@@ -114,67 +112,71 @@
* @param file
* @throws Exception
*/
- public static void exportJournalFile(final PrintStream out, SequentialFileFactory
fileFactory, JournalFile file) throws Exception
+ public static void exportJournalFile(final PrintStream out,
+ final SequentialFileFactory fileFactory,
+ final JournalFile file) throws Exception
{
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
{
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo)
throws Exception
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
{
- out.println("operation@UpdateTX,txID@" + transactionID +
"," + describeRecord(recordInfo));
+ out.println("operation@UpdateTX,txID@" + transactionID +
"," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation@Update," + describeRecord(recordInfo));
+ out.println("operation@Update," +
ExportJournal.describeRecord(recordInfo));
}
- public void onReadRollbackRecord(long transactionID) throws Exception
+ public void onReadRollbackRecord(final long transactionID) throws Exception
{
out.println("operation@Rollback,txID@" + transactionID);
}
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int
numberOfRecords) throws Exception
+ public void onReadPrepareRecord(final long transactionID, final byte[]
extraData, final int numberOfRecords) throws Exception
{
out.println("operation@Prepare,txID@" + transactionID +
",numberOfRecords@" +
numberOfRecords +
",extraData@" +
- encode(extraData));
+ ExportJournal.encode(extraData));
}
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo)
throws Exception
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
{
- out.println("operation@DeleteRecordTX,txID@" + transactionID +
"," + describeRecord(recordInfo));
+ out.println("operation@DeleteRecordTX,txID@" + transactionID +
+ "," +
+ ExportJournal.describeRecord(recordInfo));
}
- public void onReadDeleteRecord(long recordID) throws Exception
+ public void onReadDeleteRecord(final long recordID) throws Exception
{
out.println("operation@DeleteRecord,id@" + recordID);
}
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws
Exception
+ public void onReadCommitRecord(final long transactionID, final int
numberOfRecords) throws Exception
{
out.println("operation@Commit,txID@" + transactionID +
",numberOfRecords@" + numberOfRecords);
}
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws
Exception
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
{
- out.println("operation@AddRecordTX,txID@" + transactionID +
"," + describeRecord(recordInfo));
+ out.println("operation@AddRecordTX,txID@" + transactionID +
"," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation@AddRecord," + describeRecord(recordInfo));
+ out.println("operation@AddRecord," +
ExportJournal.describeRecord(recordInfo));
}
- public void markAsDataFile(JournalFile file)
+ public void markAsDataFile(final JournalFile file)
{
}
});
}
- private static String describeRecord(RecordInfo recordInfo)
+ private static String describeRecord(final RecordInfo recordInfo)
{
return "id@" + recordInfo.id +
",userRecordType@" +
@@ -186,10 +188,10 @@
",compactCount@" +
recordInfo.compactCount +
",data@" +
- encode(recordInfo.data);
+ ExportJournal.encode(recordInfo.data);
}
- private static String encode(byte[] data)
+ private static String encode(final byte[] data)
{
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES |
Base64.URL_SAFE);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalRecord;
import org.hornetq.utils.Base64;
/**
@@ -52,7 +51,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
{
if (arg.length != 5)
{
@@ -62,7 +61,7 @@
try
{
- importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]),
arg[4]);
}
catch (Exception e)
{
@@ -71,34 +70,35 @@
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileInput) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileInput) throws Exception
{
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize,
fileInputStream);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles,
fileSize, fileInputStream);
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- InputStream stream) throws Exception
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final InputStream stream) throws Exception
{
Reader reader = new InputStreamReader(stream);
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize,
reader);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles,
fileSize, reader);
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- Reader reader) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final Reader reader) throws Exception
{
File journalDir = new File(directory);
@@ -139,7 +139,7 @@
continue;
}
- Properties lineProperties = parseLine(splitLine);
+ Properties lineProperties = ImportJournal.parseLine(splitLine);
String operation = null;
try
@@ -148,67 +148,67 @@
if (operation.equals("AddRecord"))
{
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
}
else if (operation.equals("AddRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType,
info.data);
}
else if (operation.equals("AddRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType,
info.data);
}
else if (operation.equals("UpdateTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecordTransactional(txID, info.id,
info.userRecordType, info.data);
}
else if (operation.equals("Update"))
{
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecord(info.id, info.userRecordType, info.data,
false);
}
else if (operation.equals("DeleteRecord"))
{
- long id = parseLong("id", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
{
journal.appendDeleteRecord(id, false);
}
}
else if (operation.equals("DeleteRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- long id = parseLong("id", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
{
journal.appendDeleteRecordTransactional(txID, id);
}
}
else if (operation.equals("Prepare"))
{
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords",
lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
- byte[] data = parseEncoding("extraData", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords",
lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+ byte[] data = ImportJournal.parseEncoding("extraData",
lineProperties);
if (counter.get() == numberOfRecords)
{
@@ -227,9 +227,9 @@
}
else if (operation.equals("Commit"))
{
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords",
lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords",
lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
if (counter.get() == numberOfRecords)
{
journal.appendCommitRecord(txID, false);
@@ -247,7 +247,7 @@
}
else if (operation.equals("Rollback"))
{
- long txID = parseLong("txID", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
journal.appendRollbackRecord(txID, false);
}
else
@@ -264,7 +264,7 @@
journal.stop();
}
- protected static AtomicInteger getCounter(Long txID, Map<Long, AtomicInteger>
txCounters)
+ protected static AtomicInteger getCounter(final Long txID, final Map<Long,
AtomicInteger> txCounters)
{
AtomicInteger counter = txCounters.get(txID);
@@ -277,50 +277,50 @@
return counter;
}
- protected static RecordInfo parseRecord(Properties properties) throws Exception
+ protected static RecordInfo parseRecord(final Properties properties) throws Exception
{
- long id = parseLong("id", properties);
- byte userRecordType = parseByte("userRecordType", properties);
- boolean isUpdate = parseBoolean("isUpdate", properties);
- byte[] data = parseEncoding("data", properties);
+ long id = ImportJournal.parseLong("id", properties);
+ byte userRecordType = ImportJournal.parseByte("userRecordType",
properties);
+ boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
+ byte[] data = ImportJournal.parseEncoding("data", properties);
return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
}
- private static byte[] parseEncoding(String name, Properties properties) throws
Exception
+ private static byte[] parseEncoding(final String name, final Properties properties)
throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
- return decode(value);
+ return ImportJournal.decode(value);
}
/**
* @param properties
* @return
*/
- private static int parseInt(String name, Properties properties) throws Exception
+ private static int parseInt(final String name, final Properties properties) throws
Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Integer.parseInt(value);
}
- private static long parseLong(String name, Properties properties) throws Exception
+ private static long parseLong(final String name, final Properties properties) throws
Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Long.parseLong(value);
}
- private static boolean parseBoolean(String name, Properties properties) throws
Exception
+ private static boolean parseBoolean(final String name, final Properties properties)
throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Boolean.parseBoolean(value);
}
- private static byte parseByte(String name, Properties properties) throws Exception
+ private static byte parseByte(final String name, final Properties properties) throws
Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Byte.parseByte(value);
}
@@ -331,7 +331,7 @@
* @return
* @throws Exception
*/
- private static String parseString(String name, Properties properties) throws
Exception
+ private static String parseString(final String name, final Properties properties)
throws Exception
{
String value = properties.getProperty(name);
@@ -342,7 +342,7 @@
return value;
}
- protected static Properties parseLine(String[] splitLine)
+ protected static Properties parseLine(final String[] splitLine)
{
Properties properties = new Properties();
@@ -362,7 +362,7 @@
return properties;
}
- private static byte[] decode(String data)
+ private static byte[] decode(final String data)
{
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -47,10 +47,10 @@
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
-
+
// We try to separate old record from new ones when doing the compacting
// this is a split line
- // We will force a moveNextFiles when the compactCount is bellow than
COMPACT_SPLIT_LINE
+ // We will force a moveNextFiles when the compactCount is bellow than
COMPACT_SPLIT_LINE
private final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
@@ -144,10 +144,11 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
+ final JournalFilesRepository filesRepository,
final Set<Long> recordsSnapshot,
final long firstFileID)
{
- super(fileFactory, journal, recordsSnapshot, firstFileID);
+ super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
}
/** This methods informs the Compactor about the existence of a pending (non
committed) transaction */
@@ -216,7 +217,7 @@
{
checkSize(size, -1);
}
-
+
private void checkSize(final int size, final int compactCount) throws Exception
{
if (getWritingChannel() == null)
@@ -238,17 +239,19 @@
return;
}
}
-
+
if (getWritingChannel().writerIndex() + size >
getWritingChannel().capacity())
{
openFile();
}
}
}
-
+
int currentCount;
+
// This means we will need to split when the compactCount is bellow the watermark
boolean willNeedToSplit = false;
+
boolean splitted = false;
private boolean checkCompact(final int compactCount) throws Exception
@@ -257,7 +260,7 @@
{
willNeedToSplit = true;
}
-
+
if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
{
willNeedToSplit = false;
@@ -270,8 +273,6 @@
return false;
}
}
-
-
/**
* Replay pending counts that happened during compacting
@@ -304,7 +305,7 @@
info.getUserRecordType(),
new
ByteArrayEncoding(info.data));
addRecord.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(addRecord.getEncodeSize(), info.compactCount);
writeEncoder(addRecord);
@@ -326,7 +327,7 @@
new
ByteArrayEncoding(info.data));
record.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -346,15 +347,15 @@
}
else
{
- JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
{
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true,
transactionID, null);
-
+
checkSize(commitRecord.getEncodeSize());
-
+
writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
-
+
newTransaction.commit(currentFile);
}
}
@@ -365,7 +366,8 @@
if (newRecords.get(recordID) != null)
{
// Sanity check, it should never happen
- throw new IllegalStateException("Inconsistency during compacting: Delete
record being read on an existent record (id=" + recordID + ")");
+ throw new IllegalStateException("Inconsistency during compacting: Delete
record being read on an existent record (id=" + recordID +
+ ")");
}
}
@@ -427,16 +429,16 @@
JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
{
-
+
JournalInternalRecord rollbackRecord = new
JournalRollbackRecordTX(transactionID);
-
+
checkSize(rollbackRecord.getEncodeSize());
writeEncoder(rollbackRecord);
-
+
newTransaction.rollback(currentFile);
}
-
+
}
}
@@ -450,7 +452,7 @@
new
ByteArrayEncoding(info.data));
updateRecord.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -482,7 +484,7 @@
new
ByteArrayEncoding(info.data));
updateRecordTX.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
@@ -531,7 +533,14 @@
void execute() throws Exception
{
JournalRecord deleteRecord = journal.getRecords().remove(id);
- deleteRecord.delete(usedFile);
+ if (deleteRecord == null)
+ {
+ JournalCompactor.log.warn("Can't find record " + id + "
during compact replay");
+ }
+ else
+ {
+ deleteRecord.delete(usedFile);
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-09-10 15:23:04 UTC
(rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-09-10 17:44:13 UTC
(rev 9668)
@@ -45,7 +45,7 @@
void decSize(int bytes);
int getLiveSize();
-
+
/** The total number of deletes this file has */
int getTotalNegativeToOthers();
@@ -58,9 +58,9 @@
/** This is a field to identify that records on this file actually belong to the
current file.
* The possible implementation for this is fileID & Integer.MAX_VALUE */
int getRecordID();
-
+
long getFileID();
-
+
int getJournalVersion();
SequentialFile getFile();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -47,7 +47,7 @@
private boolean canReclaim;
- private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+ private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
private final int version;
@@ -61,7 +61,7 @@
this.version = version;
- this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
+ recordID = (int)(fileID & Integer.MAX_VALUE);
}
public void clearCounts()
@@ -165,7 +165,7 @@
{
try
{
- return "JournalFileImpl: (" + file.getFileName() + " id = "
+ this.fileID + ", recordID = " + recordID + ")";
+ return "JournalFileImpl: (" + file.getFileName() + " id = "
+ fileID + ", recordID = " + recordID + ")";
}
catch (Exception e)
{
Added: trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -0,0 +1,593 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles,
openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ *
+ */
+public class JournalFilesRepository
+{
+
+ private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
+
+ private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging
+ // Journal
+ private static final void trace(final String message)
+ {
+ JournalFilesRepository.log.trace(message);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final SequentialFileFactory fileFactory;
+
+ private final BlockingDeque<JournalFile> dataFiles = new
LinkedBlockingDeque<JournalFile>();
+
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new
LinkedBlockingDeque<JournalFile>();
+
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new
ConcurrentLinkedQueue<JournalFile>();
+
+ private final BlockingQueue<JournalFile> openedFiles = new
LinkedBlockingQueue<JournalFile>();
+
+ private final AtomicLong nextFileID = new AtomicLong(0);
+
+ private final int maxAIO;
+
+ private final int minFiles;
+
+ private final int fileSize;
+
+ private final String filePrefix;
+
+ private final String fileExtension;
+
+ private final int userVersion;
+
+ private Executor filesExecutor;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public JournalFilesRepository(final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
+ {
+ this.fileFactory = fileFactory;
+ this.maxAIO = maxAIO;
+ this.filePrefix = filePrefix;
+ this.fileExtension = fileExtension;
+ this.minFiles = minFiles;
+ this.fileSize = fileSize;
+ this.userVersion = userVersion;
+ }
+
+ // Public --------------------------------------------------------
+
+ public void setExecutor(final Executor executor)
+ {
+ filesExecutor = executor;
+ }
+
+ public void clear()
+ {
+ dataFiles.clear();
+
+ drainClosedFiles();
+
+ freeFiles.clear();
+
+ for (JournalFile file : openedFiles)
+ {
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+ }
+ openedFiles.clear();
+ }
+
+ public int getMaxAIO()
+ {
+ return maxAIO;
+ }
+
+ public String getFileExtension()
+ {
+ return fileExtension;
+ }
+
+ public String getFilePrefix()
+ {
+ return filePrefix;
+ }
+
+ public void calculateNextfileID(final List<JournalFile> files)
+ {
+
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+
+ }
+
+ public void ensureMinFiles() throws Exception
+ {
+ // FIXME - size() involves a scan
+ int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+
+ if (filesToCreate > 0)
+ {
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ // Keeping all files opened can be very costly (mainly on AIO)
+ freeFiles.add(createFile(false, false, true, false));
+ }
+ }
+
+ }
+
+ public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+ {
+ if (multiAIO)
+ {
+ file.getFile().open();
+ }
+ else
+ {
+ file.getFile().open(1, false);
+ }
+
+
file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+ }
+
+ // Data File Operations ==========================================
+
+ public JournalFile[] getDataFilesArray()
+ {
+ return dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ }
+
+ public JournalFile pollLastDataFile()
+ {
+ return dataFiles.pollLast();
+ }
+
+ public void removeDataFile(final JournalFile file)
+ {
+ if (!dataFiles.remove(file))
+ {
+ JournalFilesRepository.log.warn("Could not remove file " + file +
" from the list of data files");
+ }
+ }
+
+ public int getDataFilesCount()
+ {
+ return dataFiles.size();
+ }
+
+ public Collection<JournalFile> getDataFiles()
+ {
+ return dataFiles;
+ }
+
+ public void clearDataFiles()
+ {
+ dataFiles.clear();
+ }
+
+ public void addDataFileOnTop(final JournalFile file)
+ {
+ dataFiles.addFirst(file);
+ }
+
+ public void addDataFileOnBottom(final JournalFile file)
+ {
+ dataFiles.add(file);
+ }
+
+ // Free File Operations ==========================================
+
+ public int getFreeFilesCount()
+ {
+ return freeFiles.size();
+ }
+
+ /**
+ * Add directly to the freeFiles structure without reinitializing the file.
+ * used on load() only
+ */
+ public void addFreeFileNoInit(final JournalFile file)
+ {
+ freeFiles.add(file);
+ }
+
+ /**
+ * @param file
+ * @throws Exception
+ */
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp)
throws Exception
+ {
+ if (file.getFile().size() != fileSize)
+ {
+ JournalFilesRepository.log.warn("Deleting " + file + ".. as it
doesn't have the configured size");
+ file.getFile().delete();
+ }
+ else
+ // FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ // Re-initialise it
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Adding free file " + file);
+ }
+
+ JournalFile jf = reinitializeFile(file);
+
+ if (renameTmp)
+ {
+
jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(),
".tmp"));
+ }
+
+ freeFiles.add(jf);
+ }
+ else
+ {
+ file.getFile().delete();
+ }
+ }
+
+ public Collection<JournalFile> getFreeFiles()
+ {
+ return freeFiles;
+ }
+
+ public JournalFile getFreeFile()
+ {
+ return freeFiles.remove();
+ }
+
+ // Opened files operations =======================================
+
+ public int getOpenedFilesCount()
+ {
+ return openedFiles.size();
+ }
+
+ public void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.warn(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * <p>This method will instantly return the opened file, and schedule opening
and reclaiming.</p>
+ * <p>In case there are no cached opened files, this method will block until the
file was opened,
+ * what would happen only if the system is under heavy load by another system (like a
backup system, or a DB sharing the same box as HornetQ).</p>
+ * */
+ public JournalFile openFile() throws InterruptedException
+ {
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size="
+ openedFiles.size());
+ }
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.error(e.getMessage(), e);
+ }
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ JournalFile nextFile = null;
+
+ while (nextFile == null)
+ {
+ nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+ if (nextFile == null)
+ {
+ JournalFilesRepository.log.warn("Couldn't open a file in 60
Seconds",
+ new Exception("Warning: Couldn't
open a file in 60 Seconds"));
+ }
+ }
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Returning file " + nextFile);
+ }
+
+ return nextFile;
+ }
+
+ /**
+ *
+ * Open a file and place it into the openedFiles queue
+ * */
+ public void pushOpenedFile() throws Exception
+ {
+ JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("pushing openFile " + nextOpenedFile);
+ }
+
+ openedFiles.offer(nextOpenedFile);
+ }
+
+ public void closeFile(final JournalFile file)
+ {
+ fileFactory.deactivateBuffer();
+ pendingCloseFiles.add(file);
+ dataFiles.add(file);
+
+ Runnable run = new Runnable()
+ {
+ public void run()
+ {
+ drainClosedFiles();
+ }
+ };
+
+ if (filesExecutor == null)
+ {
+ run.run();
+ }
+ else
+ {
+ filesExecutor.execute(run);
+ }
+
+ }
+
+ /**
+ * This will get a File from freeFile without initializing it
+ * @return
+ * @throws Exception
+ */
+ public JournalFile takeFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean initFile,
+ final boolean tmpCompactExtension) throws Exception
+ {
+ JournalFile nextFile = null;
+
+ nextFile = freeFiles.poll();
+
+ if (nextFile == null)
+ {
+ nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
+ if (tmpCompactExtension)
+ {
+ SequentialFile sequentialFile = nextFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ }
+
+ if (keepOpened)
+ {
+ openFile(nextFile, multiAIO);
+ }
+ }
+ return nextFile;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * This method will create a new file on the file system, pre-fill it with
FILL_CHARACTER
+ * @param keepOpened
+ * @return
+ * @throws Exception
+ */
+ private JournalFile createFile(final boolean keepOpened,
+ final boolean multiAIO,
+ final boolean init,
+ final boolean tmpCompact) throws Exception
+ {
+ long fileID = generateFileID();
+
+ String fileName;
+
+ fileName = createFileName(tmpCompact, fileID);
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Creating file " + fileName);
+ }
+
+ String tmpFileName = fileName + ".tmp";
+
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName,
maxAIO);
+
+ sequentialFile.open(1, false);
+
+ if (init)
+ {
+ sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+ JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
+ }
+
+ long position = sequentialFile.position();
+
+ sequentialFile.close();
+
+ if (JournalFilesRepository.trace)
+ {
+ JournalFilesRepository.trace("Renaming file " + tmpFileName + "
as " + fileName);
+ }
+
+ sequentialFile.renameTo(fileName);
+
+ if (keepOpened)
+ {
+ if (multiAIO)
+ {
+ sequentialFile.open();
+ }
+ else
+ {
+ sequentialFile.open(1, false);
+ }
+ sequentialFile.position(position);
+ }
+
+ return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+ }
+
+ /**
+ * @param tmpCompact
+ * @param fileID
+ * @return
+ */
+ private String createFileName(final boolean tmpCompact, final long fileID)
+ {
+ String fileName;
+ if (tmpCompact)
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension +
".cmp";
+ }
+ else
+ {
+ fileName = filePrefix + "-" + fileID + "." + fileExtension;
+ }
+ return fileName;
+ }
+
+ private long generateFileID()
+ {
+ return nextFileID.incrementAndGet();
+ }
+
+ /** Get the ID part of the name */
+ private long getFileNameID(final String fileName)
+ {
+ try
+ {
+ return Long.parseLong(fileName.substring(filePrefix.length() + 1,
fileName.indexOf('.')));
+ }
+ catch (Throwable e)
+ {
+ JournalFilesRepository.log.warn("Impossible to get the ID part of the file
name " + fileName, e);
+ return 0;
+ }
+ }
+
+ // Discard the old JournalFile and set it with a new ID
+ private JournalFile reinitializeFile(final JournalFile file) throws Exception
+ {
+ long newFileID = generateFileID();
+
+ SequentialFile sf = file.getFile();
+
+ sf.open(1, false);
+
+ int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion,
newFileID);
+
+ JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+ sf.position(position);
+
+ sf.close();
+
+ return jf;
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-09-10 15:23:04 UTC
(rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-09-10 17:44:13 UTC
(rev 9668)
@@ -24,16 +24,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,7 +63,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
-
/**
*
* <p>A circular log implementation.</p
@@ -91,23 +85,32 @@
private static final int STATE_LOADED = 2;
public static final int FORMAT_VERSION = 2;
-
- private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
+ private static final int COMPATIBLE_VERSIONS[] = new int[] { 1 };
+
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
+ private static final boolean trace = JournalImpl.log.isTraceEnabled();
+ // This is useful at debug time...
+ // if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent
to System.out
+ private static final boolean TRACE_RECORDS = false;
+
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
private static final void trace(final String message)
{
- log.trace(message);
+ JournalImpl.log.trace(message);
}
+ private static final void traceRecord(final String message)
+ {
+ JournalImpl.log.trace(message);
+ }
+
// The sizes of primitive types
public static final int MIN_FILE_SIZE = 1024;
@@ -167,12 +170,8 @@
private volatile boolean autoReclaim = true;
- private final AtomicLong nextFileID = new AtomicLong(0);
-
private final int userVersion;
- private final int maxAIO;
-
private final int fileSize;
private final int minFiles;
@@ -183,18 +182,8 @@
private final SequentialFileFactory fileFactory;
- public final String filePrefix;
+ private final JournalFilesRepository filesRepository;
- public final String fileExtension;
-
- private final BlockingDeque<JournalFile> dataFiles = new
LinkedBlockingDeque<JournalFile>();
-
- private final BlockingQueue<JournalFile> pendingCloseFiles = new
LinkedBlockingDeque<JournalFile>();
-
- private final ConcurrentLinkedQueue<JournalFile> freeFiles = new
ConcurrentLinkedQueue<JournalFile>();
-
- private final BlockingQueue<JournalFile> openedFiles = new
LinkedBlockingQueue<JournalFile>();
-
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new
ConcurrentHashMap<Long, JournalRecord>();
@@ -301,12 +290,14 @@
this.fileFactory = fileFactory;
- this.filePrefix = filePrefix;
+ filesRepository = new JournalFilesRepository(fileFactory,
+ filePrefix,
+ fileExtension,
+ userVersion,
+ maxAIO,
+ fileSize,
+ minFiles);
- this.fileExtension = fileExtension;
-
- this.maxAIO = maxAIO;
-
this.userVersion = userVersion;
}
@@ -390,19 +381,19 @@
* It won't be part of the interface as the tools should be specific to the
implementation */
public List<JournalFile> orderFiles() throws Exception
{
- List<String> fileNames = fileFactory.listFiles(fileExtension);
+ List<String> fileNames =
fileFactory.listFiles(filesRepository.getFileExtension());
List<JournalFile> orderedFiles = new
ArrayList<JournalFile>(fileNames.size());
for (String fileName : fileNames)
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+ SequentialFile file = fileFactory.createSequentialFile(fileName,
filesRepository.getMaxAIO());
file.open(1, false);
try
{
-
+
JournalFileImpl jrnFile = readFileHeader(file);
orderedFiles.add(jrnFile);
@@ -421,29 +412,6 @@
return orderedFiles;
}
- private void calculateNextfileID(List<JournalFile> files)
- {
-
- for (JournalFile file : files)
- {
- long fileID = file.getFileID();
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
-
- long fileNameID = getFileNameID(file.getFile().getFileName());
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
- }
-
- }
-
/** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
@@ -505,20 +473,20 @@
}
short compactCount = 0;
-
+
if (file.getJournalVersion() >= 2)
{
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(),
DataConstants.SIZE_BYTE))
{
reader.markAsDataFile(file);
-
+
wholeFileBuffer.position(pos + 1);
continue;
}
-
+
compactCount = wholeFileBuffer.get();
}
-
+
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -714,19 +682,31 @@
case ADD_RECORD_TX:
{
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
userRecordType, record, false, compactCount));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ false,
+ compactCount));
break;
}
case UPDATE_RECORD_TX:
{
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
userRecordType, record, true, compactCount));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
+
userRecordType,
+ record,
+ true,
+
compactCount));
break;
}
case DELETE_RECORD_TX:
{
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
(byte)0, record, true, compactCount));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
+ (byte)0,
+ record,
+ true,
+
compactCount));
break;
}
@@ -780,7 +760,7 @@
}
catch (Throwable e)
{
- log.warn(e.getMessage(), e);
+ JournalImpl.log.warn(e.getMessage(), e);
throw new Exception(e.getMessage(), e);
}
finally
@@ -856,6 +836,15 @@
{
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
+
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
finally
@@ -932,6 +921,15 @@
{
JournalFile usedFile = appendRecord(updateRecord, false, sync, null,
callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
+
// record== null here could only mean there is a compactor, and computing the
delete should be done after
// compacting is done
if (jrnRecord == null)
@@ -1008,6 +1006,11 @@
{
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null,
callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecord::id=" + id + ",
usedFile = " + usedFile);
+ }
+
// record== null here could only mean there is a compactor, and computing the
delete should be done after
// compacting is done
if (record == null)
@@ -1060,6 +1063,17 @@
{
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendAddRecordTransactional:txID=" +
txID +
+ ",id=" +
+ id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
+
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
finally
@@ -1104,6 +1118,17 @@
{
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendUpdateRecordTransactional::txID="
+ txID +
+ ",id=" +
+ id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
+ }
+
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
finally
@@ -1142,6 +1167,15 @@
{
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendDeleteRecordTransactional::txID="
+ txID +
+ ", id=" +
+ id +
+ ", usedFile = " +
+ usedFile);
+ }
+
tx.addNegative(usedFile, id);
}
finally
@@ -1231,6 +1265,11 @@
{
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx,
callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendPrepareRecord::txID=" + txID +
", usedFile = " + usedFile);
+ }
+
tx.prepare(usedFile);
}
finally
@@ -1304,6 +1343,11 @@
{
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("appendCommitRecord::txID=" + txID +
", usedFile = " + usedFile);
+ }
+
tx.commit(usedFile);
}
finally
@@ -1433,9 +1477,9 @@
private void checkDeleteSize()
{
// HORNETQ-482 - Flush deletes only if memory is critical
- if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory()
< (runtime.maxMemory() * 0.2)))
+ if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory()
< runtime.maxMemory() * 0.2)
{
- log.debug("Flushing deletes during loading, deleteCount = " +
recordsToDelete.size());
+ JournalImpl.log.debug("Flushing deletes during loading, deleteCount =
" + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to
load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
@@ -1451,7 +1495,7 @@
recordsToDelete.clear();
- log.debug("flush delete done");
+ JournalImpl.log.debug("flush delete done");
}
}
@@ -1514,18 +1558,22 @@
throw new IllegalStateException("There is pending compacting
operation");
}
- ArrayList<JournalFile> dataFilesToProcess = new
ArrayList<JournalFile>(dataFiles.size());
+ ArrayList<JournalFile> dataFilesToProcess = new
ArrayList<JournalFile>(filesRepository.getDataFilesCount());
boolean previousReclaimValue = autoReclaim;
try
{
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.trace("Starting compacting operation on journal");
}
- JournalImpl.log.debug("Starting compacting operation on journal");
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Starting compacting operation on
journal");
+ }
+
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
@@ -1543,22 +1591,26 @@
setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives
and positives counts
- moveNextFile(true);
+ moveNextFile(false);
+ filesRepository.drainClosedFiles();
+
// Take the snapshots and replace the structures
- dataFilesToProcess.addAll(dataFiles);
+ dataFilesToProcess.addAll(filesRepository.getDataFiles());
- dataFiles.clear();
+ filesRepository.clearDataFiles();
- drainClosedFiles();
-
if (dataFilesToProcess.size() == 0)
{
return;
}
- compactor = new JournalCompactor(fileFactory, this, records.keySet(),
dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory,
+ this,
+ filesRepository,
+ records.keySet(),
+ dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry :
transactions.entrySet())
{
@@ -1590,7 +1642,7 @@
}
catch (Throwable e)
{
- log.warn("Error on reading compacting for " + file);
+ JournalImpl.log.warn("Error on reading compacting for " +
file);
throw new Exception("Error on reading compacting for " + file,
e);
}
}
@@ -1632,12 +1684,12 @@
{
JournalImpl.trace("Adding file " + fileToAdd + " back as
datafile");
}
- dataFiles.addFirst(fileToAdd);
+ filesRepository.addDataFileOnTop(fileToAdd);
}
- if (trace)
+ if (JournalImpl.trace)
{
- JournalImpl.trace("There are " + dataFiles.size() + "
datafiles Now");
+ JournalImpl.trace("There are " +
filesRepository.getDataFilesCount() + " datafiles Now");
}
// Replay pending commands (including updates, deletes and commits)
@@ -1666,7 +1718,7 @@
}
else
{
- log.warn("Couldn't find tx=" + newTransaction.getId() +
" to merge after compacting");
+ JournalImpl.log.warn("Couldn't find tx=" +
newTransaction.getId() + " to merge after compacting");
}
}
}
@@ -1679,11 +1731,16 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- if (trace)
+ if (JournalImpl.trace)
{
JournalImpl.log.debug("Finished compacting on journal");
}
+ if (JournalImpl.TRACE_RECORDS)
+ {
+ JournalImpl.traceRecord("Finished compacting on journal");
+ }
+
}
finally
{
@@ -1753,21 +1810,15 @@
records.clear();
- dataFiles.clear();
+ filesRepository.clear();
- pendingCloseFiles.clear();
-
- freeFiles.clear();
-
- openedFiles.clear();
-
transactions.clear();
final Map<Long, TransactionHolder> loadTransactions = new
LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
- calculateNextfileID(orderedFiles);
+ filesRepository.calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -1817,7 +1868,8 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length +
JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
+ posFiles.addUpdateFile(file, info.data.length +
JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+ // count
}
}
@@ -1862,12 +1914,13 @@
if (tnp == null)
{
- tnp = new JournalTransaction(info.id, JournalImpl.this);
+ tnp = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id, info.data.length +
JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
+ tnp.addPositive(file, info.id, info.data.length +
JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+ // count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo
info) throws Exception
@@ -2030,43 +2083,23 @@
if (hasData.get())
{
lastDataPos = resultLastPost;
- dataFiles.add(file);
+ filesRepository.addDataFileOnBottom(file);
}
else
{
// Empty dataFiles with no data
- freeFiles.add(file);
+ filesRepository.addFreeFileNoInit(file);
}
}
// Create any more files we need
- // FIXME - size() involves a scan
- int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
+ filesRepository.ensureMinFiles();
- if (filesToCreate > 0)
- {
- for (int i = 0; i < filesToCreate; i++)
- {
- // Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
- }
- }
+ // The current file is the last one that has data
- // The current file is the last one
+ currentFile = filesRepository.pollLastDataFile();
- Iterator<JournalFile> iter = dataFiles.iterator();
-
- while (iter.hasNext())
- {
- currentFile = iter.next();
-
- if (!iter.hasNext())
- {
- iter.remove();
- }
- }
-
if (currentFile != null)
{
currentFile.getFile().open();
@@ -2075,15 +2108,17 @@
}
else
{
- currentFile = freeFiles.remove();
+ currentFile = filesRepository.getFreeFile();
- openFile(currentFile, true);
+ filesRepository.openFile(currentFile, true);
}
fileFactory.activateBuffer(currentFile.getFile());
- pushOpenedFile();
+ filesRepository.pushOpenedFile();
+ state = JournalImpl.STATE_LOADED;
+
for (TransactionHolder transaction : loadTransactions.values())
{
if (!transaction.prepared || transaction.invalid)
@@ -2091,19 +2126,9 @@
JournalImpl.log.warn("Uncommitted transaction with id " +
transaction.transactionID +
" found and discarded");
- JournalTransaction transactionInfo =
transactions.get(transaction.transactionID);
+ // I append a rollback record here, because otherwise compacting will be
throwing messages because of unknown transactions
+ this.appendRollbackRecord(transaction.transactionID, false);
- if (transactionInfo == null)
- {
- throw new IllegalStateException("Cannot find tx " +
transaction.transactionID);
- }
-
- // Reverse the refs
- transactionInfo.forget();
-
- // Remove the transactionInfo
- transactions.remove(transaction.transactionID);
-
loadManager.failedTransaction(transaction.transactionID,
transaction.recordInfos,
transaction.recordsToDelete);
@@ -2128,8 +2153,6 @@
}
}
- state = JournalImpl.STATE_LOADED;
-
checkReclaimStatus();
return new JournalLoadInformation(records.size(), maxID.longValue());
@@ -2152,7 +2175,7 @@
{
reclaimer.scan(getDataFiles());
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
{
if (file.isCanReclaim())
{
@@ -2162,12 +2185,9 @@
JournalImpl.trace("Reclaiming file " + file);
}
- if (!dataFiles.remove(file))
- {
- JournalImpl.log.warn("Could not remove file " + file);
- }
+ filesRepository.removeDataFile(file);
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
}
}
}
@@ -2179,9 +2199,6 @@
return false;
}
-
- int deleteme = 0;
-
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2196,9 +2213,9 @@
long totalBytes = (long)dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
-
- boolean needCompact = (totalLiveSize < compactMargin &&
!compactorRunning.get() && dataFiles.length > compactMinFiles);
+ boolean needCompact = totalLiveSize < compactMargin && dataFiles.length
> compactMinFiles;
+
return needCompact;
}
@@ -2216,35 +2233,40 @@
return;
}
- if (needsCompact())
+ if (!compactorRunning.get() && needsCompact())
{
- if (!compactorRunning.compareAndSet(false, true))
- {
- return;
- }
+ scheduleCompact();
+ }
+ }
- // We can't use the executor for the compacting... or we would dead lock
because of file open and creation
- // operations (that will use the executor)
- compactorExecutor.execute(new Runnable()
+ private void scheduleCompact()
+ {
+ if (!compactorRunning.compareAndSet(false, true))
+ {
+ return;
+ }
+
+ // We can't use the executor for the compacting... or we would dead lock
because of file open and creation
+ // operations (that will use the executor)
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
{
- public void run()
- {
- try
- {
- JournalImpl.this.compact();
- }
- catch (Throwable e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- finally
- {
- compactorRunning.set(false);
- }
+ try
+ {
+ JournalImpl.this.compact();
}
- });
- }
+ catch (Throwable e)
+ {
+ JournalImpl.log.error(e.getMessage(), e);
+ }
+ finally
+ {
+ compactorRunning.set(false);
+ }
+ }
+ });
}
// TestableJournal implementation
@@ -2266,7 +2288,7 @@
StringBuilder builder = new StringBuilder();
- for (JournalFile file : dataFiles)
+ for (JournalFile file : filesRepository.getDataFiles())
{
builder.append("DataFile:" + file +
" posCounter = " +
@@ -2283,7 +2305,7 @@
}
}
- for (JournalFile file : freeFiles)
+ for (JournalFile file : filesRepository.getFreeFiles())
{
builder.append("FreeFile:" + file + "\n");
}
@@ -2302,8 +2324,6 @@
builder.append("CurrentFile: No current file at this point!");
}
- builder.append("#Opened Files:" + openedFiles.size());
-
return builder.toString();
}
@@ -2339,22 +2359,22 @@
public int getDataFilesCount()
{
- return dataFiles.size();
+ return filesRepository.getDataFilesCount();
}
public JournalFile[] getDataFiles()
{
- return (JournalFile[])dataFiles.toArray(new JournalFile[dataFiles.size()]);
+ return filesRepository.getDataFilesArray();
}
public int getFreeFilesCount()
{
- return freeFiles.size();
+ return filesRepository.getFreeFilesCount();
}
public int getOpenedFilesCount()
{
- return openedFiles.size();
+ return filesRepository.getOpenedFilesCount();
}
public int getIDMapSize()
@@ -2374,17 +2394,17 @@
public String getFilePrefix()
{
- return filePrefix;
+ return filesRepository.getFilePrefix();
}
public String getFileExtension()
{
- return fileExtension;
+ return filesRepository.getFileExtension();
}
public int getMaxAIO()
{
- return maxAIO;
+ return filesRepository.getMaxAIO();
}
public int getUserVersion()
@@ -2395,23 +2415,13 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
- forceMoveNextFile(true);
- }
-
- // In some tests we need to force the journal to move to a next file
- public void forceMoveNextFile(final boolean synchronous) throws Exception
- {
compactingLock.readLock().lock();
try
{
lockAppend.lock();
try
{
- moveNextFile(synchronous);
- if (autoReclaim && synchronous)
- {
- checkReclaimStatus();
- }
+ moveNextFile(false);
debugWait();
}
finally
@@ -2448,7 +2458,7 @@
filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
{
return new Thread(r, "JournalImpl::FilesExecutor");
}
@@ -2457,12 +2467,14 @@
compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
- public Thread newThread(Runnable r)
+ public Thread newThread(final Runnable r)
{
return new Thread(r, "JournalImpl::CompactorExecutor");
}
});
+ filesRepository.setExecutor(filesExecutor);
+
fileFactory.start();
state = JournalImpl.STATE_STARTED;
@@ -2493,6 +2505,8 @@
filesExecutor.shutdown();
+ filesRepository.setExecutor(null);
+
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
JournalImpl.log.warn("Couldn't stop journal executor after 60
seconds");
@@ -2505,20 +2519,13 @@
currentFile.getFile().close();
}
- for (JournalFile file : openedFiles)
- {
- file.getFile().close();
- }
+ filesRepository.drainClosedFiles();
+ filesRepository.clear();
+
fileFactory.stop();
currentFile = null;
-
- dataFiles.clear();
-
- freeFiles.clear();
-
- openedFiles.clear();
}
finally
{
@@ -2578,11 +2585,11 @@
{
try
{
- addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false);
}
catch (Throwable e)
{
- log.warn("Error reinitializing file " + file, e);
+ JournalImpl.log.warn("Error reinitializing file " + file,
e);
}
}
done.countDown();
@@ -2596,7 +2603,7 @@
for (JournalFile file : newFiles)
{
- String newName = renameExtensionFile(file.getFile().getFileName(),
".cmp");
+ String newName = JournalImpl.renameExtensionFile(file.getFile().getFileName(),
".cmp");
file.getFile().renameTo(newName);
}
@@ -2606,7 +2613,7 @@
* @param name
* @return
*/
- private String renameExtensionFile(String name, String extension)
+ protected static String renameExtensionFile(String name, final String extension)
{
name = name.substring(0, name.lastIndexOf(extension));
return name;
@@ -2632,63 +2639,6 @@
// -----------------------------------------------------------------------------
/**
- * @param file
- * @throws Exception
- */
- private void addFreeFile(final JournalFile file, final boolean renameTmp) throws
Exception
- {
- if (file.getFile().size() != this.getFileSize())
- {
- log.warn("Deleting " + file + ".. as it doesn't have the
configured size", new Exception("trace"));
- file.getFile().delete();
- }
- else
- // FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- // Re-initialise it
-
- if (trace)
- {
- trace("Adding free file " + file);
- }
-
- JournalFile jf = reinitializeFile(file);
-
- if (renameTmp)
- {
- jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(),
".tmp"));
- }
-
- freeFiles.add(jf);
- }
- else
- {
- file.getFile().delete();
- }
- }
-
- // Discard the old JournalFile and set it with a new ID
- private JournalFile reinitializeFile(final JournalFile file) throws Exception
- {
- long newFileID = generateFileID();
-
- SequentialFile sf = file.getFile();
-
- sf.open(1, false);
-
- int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
-
- JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
-
- sf.position(position);
-
- sf.close();
-
- return jf;
- }
-
- /**
* <p> Check for holes on the transaction (a commit written but with an
incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete
as stated on the COMMIT-RECORD.</p>
*
@@ -2779,29 +2729,30 @@
* @return
* @throws Exception
*/
- private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
+ private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception
{
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
file.read(bb);
int journalVersion = bb.getInt();
-
- if (journalVersion != FORMAT_VERSION)
+
+ if (journalVersion != JournalImpl.FORMAT_VERSION)
{
boolean isCompatible = false;
-
- for (int v : COMPATIBLE_VERSIONS)
+
+ for (int v : JournalImpl.COMPATIBLE_VERSIONS)
{
if (v == journalVersion)
{
isCompatible = true;
}
}
-
+
if (!isCompatible)
{
- throw new HornetQException(HornetQException.IO_ERROR, "Journal files
version mismatch. You should export the data from the previous version and import it as
explained on the user's manual");
+ throw new HornetQException(HornetQException.IO_ERROR,
+ "Journal files version mismatch. You should
export the data from the previous version and import it as explained on the user's
manual");
}
}
@@ -2817,7 +2768,7 @@
fileFactory.releaseBuffer(bb);
bb = null;
-
+
return new JournalFileImpl(file, fileID, journalVersion);
}
@@ -2836,7 +2787,7 @@
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
- writeHeader(buffer, userVersion, fileID);
+ JournalImpl.writeHeader(buffer, userVersion, fileID);
bb.rewind();
@@ -2854,9 +2805,9 @@
* @param userVersion
* @param fileID
*/
- public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long
fileID)
+ public static void writeHeader(final HornetQBuffer buffer, final int userVersion,
final long fileID)
{
- buffer.writeInt(FORMAT_VERSION);
+ buffer.writeInt(JournalImpl.FORMAT_VERSION);
buffer.writeInt(userVersion);
@@ -2890,7 +2841,7 @@
if (!currentFile.getFile().fits(size))
{
- moveNextFile(false);
+ moveNextFile(true);
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
@@ -2960,196 +2911,26 @@
return currentFile;
}
- /** Get the ID part of the name */
- private long getFileNameID(final String fileName)
+ // You need to guarantee lock.acquire() before calling this method
+ private void moveNextFile(final boolean scheduleReclaim) throws InterruptedException
{
- try
- {
- return Long.parseLong(fileName.substring(filePrefix.length() + 1,
fileName.indexOf('.')));
- }
- catch (Throwable e)
- {
- JournalImpl.log.warn("Impossible to get the ID part of the file name "
+ fileName, e);
- return 0;
- }
- }
+ filesRepository.closeFile(currentFile);
- /**
- * This method will create a new file on the file system, pre-fill it with
FILL_CHARACTER
- * @param keepOpened
- * @return
- * @throws Exception
- */
- private JournalFile createFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean init,
- final boolean tmpCompact) throws Exception
- {
- long fileID = generateFileID();
+ currentFile = filesRepository.openFile();
- String fileName;
-
- fileName = createFileName(tmpCompact, fileID);
-
- if (JournalImpl.trace)
+ if (scheduleReclaim)
{
- JournalImpl.trace("Creating file " + fileName);
+ scheduleReclaim();
}
- String tmpFileName = fileName + ".tmp";
-
- SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName,
maxAIO);
-
- sequentialFile.open(1, false);
-
- if (init)
- {
- sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
-
- initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
- }
-
- long position = sequentialFile.position();
-
- sequentialFile.close();
-
if (JournalImpl.trace)
{
- JournalImpl.trace("Renaming file " + tmpFileName + " as " +
fileName);
+ JournalImpl.trace("moveNextFile: " + currentFile);
}
- sequentialFile.renameTo(fileName);
-
- if (keepOpened)
- {
- if (multiAIO)
- {
- sequentialFile.open();
- }
- else
- {
- sequentialFile.open(1, false);
- }
- sequentialFile.position(position);
- }
-
- return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
- }
-
- /**
- * @param tmpCompact
- * @param fileID
- * @return
- */
- private String createFileName(final boolean tmpCompact, long fileID)
- {
- String fileName;
- if (tmpCompact)
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension +
".cmp";
- }
- else
- {
- fileName = filePrefix + "-" + fileID + "." + fileExtension;
- }
- return fileName;
- }
-
- private void openFile(final JournalFile file, final boolean multiAIO) throws
Exception
- {
- if (multiAIO)
- {
- file.getFile().open();
- }
- else
- {
- file.getFile().open(1, false);
- }
-
-
file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
- }
-
- private long generateFileID()
- {
- return nextFileID.incrementAndGet();
- }
-
- // You need to guarantee lock.acquire() before calling this method
- private void moveNextFile(final boolean synchronous) throws InterruptedException
- {
- closeFile(currentFile, synchronous);
-
- currentFile = enqueueOpenFile(synchronous);
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("moveNextFile: " + currentFile + " sync: "
+ synchronous);
- }
-
fileFactory.activateBuffer(currentFile.getFile());
}
- /**
- * <p>This method will instantly return the opened file, and schedule opening
and reclaiming.</p>
- * <p>In case there are no cached opened files, this method will block until the
file was opened,
- * what would happen only if the system is under heavy load by another system (like a
backup system, or a DB sharing the same box as HornetQ).</p>
- * */
- private JournalFile enqueueOpenFile(final boolean synchronous) throws
InterruptedException
- {
- if (JournalImpl.trace)
- {
- JournalImpl.trace("enqueueOpenFile with openedFiles.size=" +
openedFiles.size());
- }
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- }
- };
-
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- if (!synchronous)
- {
- scheduleReclaim();
- }
-
- JournalFile nextFile = null;
-
- while (nextFile == null)
- {
- nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
- if (nextFile == null)
- {
- JournalImpl.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in
60 Seconds"));
- }
- }
-
- if (trace)
- {
- JournalImpl.trace("Returning file " + nextFile);
- }
-
- return nextFile;
- }
-
private void scheduleReclaim()
{
if (state != JournalImpl.STATE_LOADED)
@@ -3165,7 +2946,7 @@
{
try
{
- drainClosedFiles();
+ filesRepository.drainClosedFiles();
if (!checkReclaimStatus())
{
checkCompact();
@@ -3180,97 +2961,6 @@
}
}
- /**
- *
- * Open a file and place it into the openedFiles queue
- * */
- private void pushOpenedFile() throws Exception
- {
- JournalFile nextOpenedFile = getFile(true, true, true, false);
-
- if (trace)
- {
- JournalImpl.trace("pushing openFile " + nextOpenedFile);
- }
-
- openedFiles.offer(nextOpenedFile);
- }
-
- /**
- * @return
- * @throws Exception
- */
- JournalFile getFile(final boolean keepOpened,
- final boolean multiAIO,
- final boolean initFile,
- final boolean tmpCompactExtension) throws Exception
- {
- JournalFile nextOpenedFile = null;
-
- nextOpenedFile = freeFiles.poll();
-
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile,
tmpCompactExtension);
- }
- else
- {
- if (tmpCompactExtension)
- {
- SequentialFile sequentialFile = nextOpenedFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
- }
-
- if (keepOpened)
- {
- openFile(nextOpenedFile, multiAIO);
- }
- }
- return nextOpenedFile;
- }
-
- private void closeFile(final JournalFile file, final boolean synchronous)
- {
- fileFactory.deactivateBuffer();
- pendingCloseFiles.add(file);
- dataFiles.add(file);
-
- Runnable run = new Runnable()
- {
- public void run()
- {
- drainClosedFiles();
- }
- };
-
- if (synchronous)
- {
- run.run();
- }
- else
- {
- filesExecutor.execute(run);
- }
-
- }
-
- private void drainClosedFiles()
- {
- JournalFile file;
- try
- {
- while ((file = pendingCloseFiles.poll()) != null)
- {
- file.getFile().close();
- }
- }
- catch (Exception e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
-
- }
-
private JournalTransaction getTransactionInfo(final long txID)
{
JournalTransaction tx = transactions.get(txID);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -74,6 +74,7 @@
}
}
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -29,6 +29,6 @@
public interface JournalRecordProvider
{
JournalCompactor getCompactor();
-
+
Map<Long, JournalRecord> getRecords();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -61,10 +61,10 @@
this.id = id;
this.journal = journal;
}
-
- public void replaceRecordProvider(JournalRecordProvider provider)
+
+ public void replaceRecordProvider(final JournalRecordProvider provider)
{
- this.journal = provider;
+ journal = provider;
}
/**
@@ -329,7 +329,7 @@
else
{
JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
-
+
if (posFiles != null)
{
posFiles.delete(trDelete.file);
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -260,14 +260,12 @@
{
internalWrite(bytes, sync, null);
}
-
-
- public void writeInternal(ByteBuffer bytes) throws Exception
+
+ public void writeInternal(final ByteBuffer bytes) throws Exception
{
internalWrite(bytes, true, null);
}
-
@Override
protected ByteBuffer newBuffer(int size, final int limit)
{
@@ -292,7 +290,7 @@
}
return;
}
-
+
position.addAndGet(bytes.limit());
if (maxIOSemaphore == null)
Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-09-10 15:23:04 UTC
(rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-09-10 17:44:13 UTC
(rev 9668)
@@ -61,20 +61,24 @@
{
Reclaimer.trace("posCount on " + currentFile + " = " +
posCount);
}
-
+
for (int j = i; j < files.length; j++)
{
if (Reclaimer.trace)
{
if (files[j].getNegCount(currentFile) != 0)
{
- Reclaimer.trace("Negative from " + files[j] + " into
" + currentFile + " = " + files[j].getNegCount(currentFile));
+ Reclaimer.trace("Negative from " + files[j] +
+ " into " +
+ currentFile +
+ " = " +
+ files[j].getNegCount(currentFile));
}
}
totNeg += files[j].getNegCount(currentFile);
}
-
+
currentFile.setCanReclaim(true);
if (posCount <= totNeg)
@@ -99,7 +103,7 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed
because " + file + " has negative values");
}
-
+
currentFile.setCanReclaim(false);
break;
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-09-10 15:23:04 UTC
(rev 9667)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-09-10 17:44:13 UTC
(rev 9668)
@@ -127,7 +127,6 @@
return;
}
-
// Need to start with the spin limiter acquired
try
{
@@ -207,7 +206,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
if (sizeChecked > bufferSize)
{
throw new IllegalStateException("Can't write records bigger than the
bufferSize(" + bufferSize +
@@ -259,7 +258,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
delayFlush = false;
bytes.encode(buffer);
@@ -306,7 +305,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
if ((force || !delayFlush) && buffer.writerIndex() > 0)
{
int pos = buffer.writerIndex();
Modified:
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
---
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -40,7 +40,7 @@
{
private final boolean isCommit;
- private final long txID;
+ public final long txID;
private final EncodingSupport transactionData;
Modified:
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
---
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -25,7 +25,7 @@
*/
public class JournalRollbackRecordTX extends JournalInternalRecord
{
- private final long txID;
+ public final long txID;
public JournalRollbackRecordTX(final long txID)
{
Modified: trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -14,6 +14,8 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
/**
*
@@ -37,10 +39,15 @@
void setRecordID(long id);
long getTransactionID();
+
+ void store(StorageManager storageManager,Transaction tx) throws Exception;
+
+ void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws
Exception;
- int increment();
+ // To be used after the update was stored or reload
+ void update(int update, StorageManager storageManager);
- int decrement();
+ void increment();
int getNumberOfMessages();
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -18,7 +18,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.utils.DataConstants;
/**
@@ -30,11 +34,13 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageTransactionInfoImpl.class);
+
// Attributes ----------------------------------------------------
private long transactionID;
- private volatile long recordID;
+ private volatile long recordID = -1;
private volatile CountDownLatch countDownCompleted;
@@ -42,7 +48,7 @@
private volatile boolean rolledback;
- private final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private AtomicInteger numberOfMessages = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -75,21 +81,25 @@
return transactionID;
}
- public int increment()
+ public void update(final int update, final StorageManager storageManager)
{
- return numberOfMessages.incrementAndGet();
+ int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
+ if (sizeAfterUpdate == 0 && storageManager != null)
+ {
+ try
+ {
+ storageManager.deletePageTransactional(this.recordID);
+ }
+ catch (Exception e)
+ {
+ log.warn("Can't delete page transaction id=" + this.recordID);
+ }
+ }
}
- public int decrement()
+ public void increment()
{
- final int value = numberOfMessages.decrementAndGet();
-
- if (value < 0)
- {
- throw new IllegalStateException("Internal error Negative value on Paging
transactions!");
- }
-
- return value;
+ numberOfMessages.incrementAndGet();
}
public int getNumberOfMessages()
@@ -103,10 +113,8 @@
{
transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt());
- countDownCompleted = null; // if it is being readed, probably it was
- // committed
- committed = true; // Unless it is a incomplete prepare, which is marked by
- // markIcomplete
+ countDownCompleted = null;
+ committed = true;
}
public synchronized void encode(final HornetQBuffer buffer)
@@ -141,6 +149,49 @@
}
}
+ public void store(final StorageManager storageManager, final Transaction tx) throws
Exception
+ {
+ storageManager.storePageTransaction(tx.getID(), this);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager,
org.hornetq.core.transaction.Transaction, int)
+ */
+ public void storeUpdate(final StorageManager storageManager, final Transaction tx,
final int depages) throws Exception
+ {
+ storageManager.updatePageTransaction(tx.getID(), this, depages);
+
+ final PageTransactionInfo pgToUpdate = this;
+
+ tx.addOperation(new TransactionOperation()
+ {
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(Transaction tx)
+ {
+ }
+
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ public void afterCommit(Transaction tx)
+ {
+ pgToUpdate.update(depages, storageManager);
+ }
+ });
+ }
+
public boolean isCommit()
{
return committed;
@@ -166,6 +217,16 @@
countDownCompleted = new CountDownLatch(1);
}
+ public String toString()
+ {
+ return "PageTransactionInfoImpl(transactionID=" + transactionID +
+ ",id=" +
+ recordID +
+ ",numberOfMessages=" +
+ numberOfMessages +
+ ")";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -14,8 +14,9 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -958,7 +959,7 @@
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE,
Boolean.valueOf(true));
- HashSet<PageTransactionInfo> pageTransactionsToUpdate = new
HashSet<PageTransactionInfo>();
+ HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new
HashMap<PageTransactionInfo, AtomicInteger>();
for (PagedMessage pagedMessage : pagedMessages)
{
@@ -978,6 +979,7 @@
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
PageTransactionInfo pageUserTransaction = null;
+ AtomicInteger countPageTX = null;
if (transactionIdDuringPaging >= 0)
{
@@ -992,6 +994,12 @@
}
else
{
+ countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
+ if (countPageTX == null)
+ {
+ countPageTX = new AtomicInteger();
+ pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
+ }
// This is to avoid a race condition where messages are depaged
// before the commit arrived
@@ -1036,8 +1044,7 @@
// This needs to be done after routing because of duplication detection
if (pageUserTransaction != null && message.isDurable())
{
- pageUserTransaction.decrement();
- pageTransactionsToUpdate.add(pageUserTransaction);
+ countPageTX.incrementAndGet();
}
}
@@ -1047,21 +1054,12 @@
return false;
}
- for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry :
pageTransactionsToUpdate.entrySet())
{
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
-
- if (pageWithTransaction.getNumberOfMessages() == 0)
- {
- // numberOfReads==numberOfWrites -> We delete the record
- storageManager.deletePageTransactional(depageTransaction.getID(),
pageWithTransaction.getRecordID());
- pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
- }
- else
- {
- storageManager.storePageTransaction(depageTransaction.getID(),
pageWithTransaction);
- }
+
+ entry.getKey().storeUpdate(storageManager, depageTransaction,
entry.getValue().intValue());
}
depageTransaction.commit();
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -134,8 +134,10 @@
void rollback(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws
Exception;
+
+ void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int
depage) throws Exception;
- void deletePageTransactional(long txID, long recordID) throws Exception;
+ void deletePageTransactional(long recordID) throws Exception;
/** This method is only useful at the backup side. We only load internal structures
making the journals ready for
* append mode on the backup side. */
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -575,13 +575,6 @@
public void storePageTransaction(final long txID, final PageTransactionInfo
pageTransaction) throws Exception
{
- if (pageTransaction.getRecordID() != 0)
- {
- // Instead of updating the record, we delete the old one as that is
- // better for reclaiming
- messageJournal.appendDeleteRecordTransactional(txID,
pageTransaction.getRecordID());
- }
-
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
@@ -590,6 +583,11 @@
pageTransaction);
}
+ public void updatePageTransaction(final long txID, final PageTransactionInfo
pageTransaction, final int depages) throws Exception
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION, new
PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+ }
+
public void storeReferenceTransactional(final long txID, final long queueID, final
long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
@@ -623,9 +621,9 @@
messageJournal.appendDeleteRecord(id, true, getContext(true));
}
- public void deletePageTransactional(final long txID, final long recordID) throws
Exception
+ public void deletePageTransactional(final long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ messageJournal.appendDeleteRecord(recordID, false);
}
public void updateScheduledDeliveryTimeTransactional(final long txID, final
MessageReference ref) throws Exception
@@ -907,14 +905,27 @@
}
case PAGE_TRANSACTION:
{
- PageTransactionInfoImpl pageTransactionInfo = new
PageTransactionInfoImpl();
+ if (record.isUpdate)
+ {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+
+ PageTransactionInfo pageTX =
pagingManager.getTransaction(pageUpdate.pageTX);
+
+ pageTX.update(pageUpdate.recods, null);
+ }
+ else
+ {
+ PageTransactionInfoImpl pageTransactionInfo = new
PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+
+ pagingManager.addTransaction(pageTransactionInfo);
+ }
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.setRecordID(record.id);
-
- pagingManager.addTransaction(pageTransactionInfo);
-
break;
}
case SET_SCHEDULED_DELIVERY_TIME:
@@ -2006,7 +2017,49 @@
super(queueID);
}
}
+
+ private static class PageUpdateTXEncoding implements EncodingSupport
+ {
+
+ public long pageTX;
+
+ public int recods;
+
+ public PageUpdateTXEncoding()
+ {
+ }
+
+ public PageUpdateTXEncoding(final long pageTX, final int records)
+ {
+ this.pageTX = pageTX;
+ this.recods = records;
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ this.pageTX = buffer.readLong();
+ this.recods = buffer.readInt();
+ }
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(pageTX);
+ buffer.writeInt(recods);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+
+ }
+
private static class ScheduledDeliveryEncoding extends QueueEncoding
{
long scheduledDeliveryTime;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -436,4 +436,18 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long,
org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int
depage) throws Exception
+ {
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -58,9 +58,9 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -614,11 +614,20 @@
else
{
Transaction tx = context.getTransaction();
+
boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-
- if (!depage && message.storeIsPaging())
+
+ // if the TX paged at least one message on a give address, all the other
addresses should also go towards paging cache now
+ boolean alreadyPaging = false;
+
+ if (tx.isPaging())
{
-
+ alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
+ }
+
+ if (!depage && message.storeIsPaging() || alreadyPaging)
+ {
+ tx.setPaging(true);
getPageOperation(tx).addMessageToPage(message);
if (startedTx)
{
@@ -1106,12 +1115,20 @@
{
private final List<ServerMessage> messagesToPage = new
ArrayList<ServerMessage>();
+ private final HashSet<SimpleString> addressesPaging = new
HashSet<SimpleString>();
+
private Transaction subTX = null;
void addMessageToPage(final ServerMessage message)
{
messagesToPage.add(message);
+ addressesPaging.add(message.getAddress());
}
+
+ boolean isPaging(final SimpleString address)
+ {
+ return addressesPaging.contains(address);
+ }
public void afterCommit(final Transaction tx)
{
@@ -1229,7 +1246,15 @@
{
subTX = tx.copy();
}
+
route(message, subTX, false);
+
+ if (subTX.isContainsPersistent())
+ {
+ // The route wouldn't be able to update the persistent flag on
the main TX
+ // If we don't do this we would eventually miss a commit record
+ tx.setContainsPersistent();
+ }
}
}
@@ -1244,7 +1269,7 @@
store.sync();
}
- storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ pageTransaction.store(storageManager, tx);
}
}
}
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-09-10 15:23:04 UTC
(rev 9667)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-09-10 17:44:13 UTC
(rev 9668)
@@ -60,12 +60,20 @@
void removeOperation(TransactionOperation sync);
boolean hasTimedOut(long currentTime, int defaultTimeout);
+
+ /** We don't want to look on operations at every send, so we keep the paging
attribute and will only look at
+ * the PagingOperation case this attribute is true*/
+ boolean isPaging();
+
+ void setPaging(boolean paging);
void putProperty(int index, Object property);
Object getProperty(int index);
void setContainsPersistent();
+
+ boolean isContainsPersistent();
void setTimeout(int timeout);
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -47,6 +47,8 @@
private final Xid xid;
private final long id;
+
+ private boolean paging = false;
private volatile State state = State.ACTIVE;
@@ -129,6 +131,11 @@
{
containsPersistent = true;
}
+
+ public boolean isContainsPersistent()
+ {
+ return containsPersistent;
+ }
public void setTimeout(final int timeout)
{
@@ -352,7 +359,17 @@
{
this.state = state;
}
+
+ public boolean isPaging()
+ {
+ return paging;
+ }
+ public void setPaging(boolean paging)
+ {
+ this.paging = paging;
+ }
+
public Xid getXid()
{
return xid;
Modified: trunk/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java
===================================================================
--- trunk/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/src/main/org/hornetq/jms/persistence/config/PersistedDestination.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -14,6 +14,7 @@
package org.hornetq.jms.persistence.config;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.utils.BufferHelper;
import org.hornetq.utils.DataConstants;
@@ -112,16 +113,17 @@
public void encode(final HornetQBuffer buffer)
{
buffer.writeByte(type.getType());
- buffer.writeString(name);
- buffer.writeNullableString(selector);
+ buffer.writeSimpleString(SimpleString.toSimpleString(name));
+ buffer.writeNullableSimpleString(SimpleString.toSimpleString(selector));
buffer.writeBoolean(durable);
}
public void decode(final HornetQBuffer buffer)
{
type = PersistedType.getType(buffer.readByte());
- name = buffer.readString();
- selector = buffer.readNullableString();
+ name = buffer.readSimpleString().toString();
+ SimpleString selectorStr = buffer.readNullableSimpleString();
+ selector = (selectorStr == null) ? null : selectorStr.toString();
durable = buffer.readBoolean();
}
}
Modified: trunk/src/main/org/hornetq/utils/ReusableLatch.java
===================================================================
--- trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-09-10 15:23:04 UTC (rev
9667)
+++ trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-09-10 17:44:13 UTC (rev
9668)
@@ -87,6 +87,11 @@
}
int newState = actualState - numberOfReleases;
+
+ if (newState < 0)
+ {
+ newState = 0;
+ }
if (compareAndSetState(actualState, newState))
{
@@ -128,6 +133,12 @@
control.releaseShared(1);
}
+
+ public void countDown(final int count)
+ {
+ control.releaseShared(count);
+ }
+
public void await() throws InterruptedException
{
control.acquireSharedInterruptibly(1);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -14,8 +14,10 @@
package org.hornetq.tests.integration.client;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
@@ -30,6 +32,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
@@ -38,7 +41,6 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.DataConstants;
/**
* A PagingTest
@@ -86,12 +88,192 @@
internaltestSendReceivePaging(true);
}
-
public void testSendReceivePagingNonPersistent() throws Exception
{
internaltestSendReceivePaging(false);
}
+ public void testWithDiverts() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ DivertConfiguration divert1 = new DivertConfiguration("dv1",
+ "nm1",
+
PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString()
+ "-1",
+ true,
+ null,
+ null);
+
+ DivertConfiguration divert2 = new DivertConfiguration("dv2",
+ "nm2",
+
PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString()
+ "-2",
+ true,
+ null,
+ null);
+
+ ArrayList<DivertConfiguration> divertList = new
ArrayList<DivertConfiguration>();
+ divertList.add(divert1);
+ divertList.add(divert2);
+
+ config.setDivertConfigurations(divertList);
+
+ server.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfMessages = 30000;
+
+ final byte[] body = new byte[numberOfIntegers * 4];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bb.putInt(j);
+ }
+
+ try
+ {
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS +
"-1", null, true);
+
+ session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS +
"-2", null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+
+ server.stop();
+ }
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ final ClientSessionFactory sf2 = createInVMFactory();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread threads[] = new Thread[2];
+
+ for (int start = 1; start <= 2; start++)
+ {
+
+ final String addressToSubscribe = PagingTest.ADDRESS + "-" +
start;
+
+ threads[start - 1] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSession session = sf2.createSession(null, null, false, true,
true, false, 0);
+
+ ClientConsumer consumer =
session.createConsumer(addressToSubscribe);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 =
consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ Assert.assertEquals(i,
message2.getIntProperty("id").intValue());
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+
+ session.commit();
+
+ try
+ {
+ assertBodiesEqual(body, message2.getBodyBuffer());
+ }
+ catch (AssertionFailedError e)
+ {
+ PagingTest.log.info("Expected buffer:" +
UnitTestCase.dumbBytesHex(body, 40));
+ PagingTest.log.info("Arriving buffer:" +
UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+
.toByteBuffer()
+
.array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].join();
+ }
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internaltestSendReceivePaging(final boolean persistentMessages) throws
Exception
{
clearData();
@@ -150,11 +332,10 @@
session.close();
-
if (persistentMessages)
{
server.stop();
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -360,8 +541,7 @@
}
}
-
-
+
/**
* - Make a destination in page mode
* - Add stuff to a transaction
@@ -396,7 +576,7 @@
sf.setBlockOnNonDurableSend(true);
sf.setBlockOnDurableSend(true);
sf.setBlockOnAcknowledge(true);
-
+
byte[] body = new byte[messageSize];
ClientSession sessionTransacted = sf.createSession(null, null, false, false,
false, false, 0);
@@ -408,7 +588,7 @@
ClientMessage firstMessage =
sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
firstMessage.getBodyBuffer().writeBytes(body);
firstMessage.putIntProperty(new SimpleString("id"), 0);
-
+
producerTransacted.send(firstMessage);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -479,7 +659,7 @@
Integer messageID = (Integer)message.getObjectProperty(new
SimpleString("id"));
-// System.out.println(messageID);
+ // System.out.println(messageID);
Assert.assertNotNull(messageID);
Assert.assertEquals("message received out of order", i,
messageID.intValue());
@@ -505,8 +685,133 @@
}
-
+ public void testDepageDuringTransaction3() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024; // 1k
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ byte[] body = new byte[messageSize];
+
+ ClientSession sessionTransacted = sf.createSession(null, null, false, false,
false, false, 0);
+ ClientProducer producerTransacted =
sessionTransacted.createProducer(PagingTest.ADDRESS);
+
+ ClientSession sessionNonTX = sf.createSession(true, true, 0);
+ sessionNonTX.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producerNonTransacted =
sessionNonTX.createProducer(PagingTest.ADDRESS);
+
+ sessionNonTX.start();
+
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("Sending " + i);
+ ClientMessage message = sessionNonTX.createMessage(true);
+ message.getBodyBuffer().writeBytes(body);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producerTransacted.send(message);
+
+ if (i % 2 == 0)
+ {
+ System.out.println("Sending 20 msgs to make it page");
+ for (int j = 0 ; j < 20; j++)
+ {
+ ClientMessage msgSend = sessionNonTX.createMessage(true);
+ msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producerNonTransacted.send(msgSend);
+ }
+
assertTrue(server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+ }
+ else
+ {
+ System.out.println("Consuming 20 msgs to make it page");
+ ClientConsumer consumer =
sessionNonTX.createConsumer(PagingTest.ADDRESS);
+ for (int j = 0 ; j < 20; j++)
+ {
+ ClientMessage msgReceived = consumer.receive(10000);
+ assertNotNull(msgReceived);
+ msgReceived.acknowledge();
+ }
+ consumer.close();
+ }
+ }
+
+ ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+ while (true)
+ {
+ ClientMessage msgReceived = consumerNonTX.receive(1000);
+ if (msgReceived == null)
+ {
+ break;
+ }
+ msgReceived.acknowledge();
+ }
+ consumerNonTX.close();
+
+
+ ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ sessionTransacted.commit();
+
+ sessionTransacted.close();
+
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage message = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message);
+
+ Integer messageID = (Integer)message.getObjectProperty(new
SimpleString("id"));
+
+ // System.out.println(messageID);
+ Assert.assertNotNull(messageID);
+ Assert.assertEquals("message received out of order", i,
messageID.intValue());
+
+ System.out.println("MessageID = " + messageID);
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ sessionNonTX.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
@@ -756,8 +1061,6 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
-
-
for (int i = 0; i < numberOfMessages; i++)
{
@@ -955,7 +1258,7 @@
}
}
-
+
public void testDropMessagesExpiring() throws Exception
{
clearData();
@@ -969,7 +1272,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
server.start();
@@ -978,7 +1281,7 @@
try
{
ClientSessionFactory sf = createInVMFactory();
-
+
sf.setAckBatchSize(0);
ClientSession session = sf.createSession();
@@ -988,7 +1291,7 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
-
+
class MyHandler implements MessageHandler
{
int count;
@@ -1001,16 +1304,16 @@
}
catch (Exception e)
{
-
+
}
-
+
count++;
-
+
if (count % 1000 == 0)
{
log.info("received " + count);
}
-
+
try
{
message.acknowledge();
@@ -1019,13 +1322,13 @@
{
e.printStackTrace();
}
- }
+ }
}
-
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
-
+
consumer.setMessageHandler(new MyHandler());
for (int i = 0; i < numberOfMessages; i++)
@@ -1034,12 +1337,12 @@
message = session.createMessage(false);
message.getBodyBuffer().writeBytes(body);
-
+
message.setExpiration(System.currentTimeMillis() + 100);
producer.send(message);
}
-
+
session.close();
}
finally
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -53,9 +53,9 @@
{
private static final Logger log = Logger.getLogger(SessionFactoryTest.class);
- private final String groupAddress = "230.1.2.3";
+ private final String groupAddress = getUDPDiscoveryAddress();
- private final int groupPort = 8765;
+ private final int groupPort = getUDPDiscoveryPort();
private HornetQServer liveService;
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -80,8 +80,8 @@
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
- final String groupAddress = "230.1.2.3";
- final int port = 7746;
+ final String groupAddress = getUDPDiscoveryAddress();
+ final int port = getUDPDiscoveryPort();
List<Pair<String, String>> connectorPairs = new
ArrayList<Pair<String, String>>();
connectorPairs.add(new Pair<String, String>(server1tc.getName(), null));
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -28,9 +28,9 @@
{
private static final Logger log =
Logger.getLogger(SymmetricClusterWithDiscoveryTest.class);
- protected static final String groupAddress = "230.1.2.3";
+ protected static final String groupAddress = getUDPDiscoveryAddress();
- protected static final int groupPort = 6745;
+ protected static final int groupPort = getUDPDiscoveryPort();
protected boolean isNetty()
{
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/DiscoveryClusterWithBackupFailoverTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -35,9 +35,9 @@
{
private static final Logger log =
Logger.getLogger(DiscoveryClusterWithBackupFailoverTest.class);
- protected static final String groupAddress = "230.1.2.3";
+ protected static final String groupAddress = getUDPDiscoveryAddress();
- protected static final int groupPort = 6745;
+ protected static final int groupPort = getUDPDiscoveryPort();
@Override
protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -53,16 +53,16 @@
{
private static final Logger log = Logger.getLogger(DiscoveryTest.class);
- private static final String address1 = "230.1.2.3";
+ private static final String address1 = getUDPDiscoveryAddress();
- private static final String address2 = "230.1.2.4";
+ private static final String address2 = getUDPDiscoveryAddress(1);
- private static final String address3 = "230.1.2.5";
+ private static final String address3 = getUDPDiscoveryAddress(2);
public void testSimpleBroadcast() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
final String nodeID = RandomUtil.randomString();
@@ -122,7 +122,7 @@
public void testSimpleBroadcastSpecificNIC() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
final String nodeID = RandomUtil.randomString();
@@ -222,7 +222,7 @@
public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
final String nodeID = RandomUtil.randomString();
@@ -304,7 +304,7 @@
public void testIgnoreTrafficFromOwnNode() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -357,7 +357,7 @@
// public void testSimpleBroadcastDifferentAddress() throws Exception
// {
// final InetAddress groupAddress = InetAddress.getByName(address1);
- // final int groupPort = 6745;
+ // final int groupPort = getUDPDiscoveryPort();
// final int timeout = 500;
//
// BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null,
-1, groupAddress, groupPort);
@@ -394,8 +394,8 @@
public void testSimpleBroadcastDifferentPort() throws Exception
{
- final InetAddress groupAddress = InetAddress.getByName("230.1.2.3");
- final int groupPort = 6745;
+ final InetAddress groupAddress = InetAddress.getByName(getUDPDiscoveryAddress());
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
BroadcastGroup bg = new BroadcastGroupImpl(RandomUtil.randomString(),
@@ -417,7 +417,7 @@
bg.addConnectorPair(connectorPair);
- final int port2 = 6746;
+ final int port2 = getUDPDiscoveryPort(1);
DiscoveryGroup dg = new DiscoveryGroupImpl(RandomUtil.randomString(),
RandomUtil.randomString(),
@@ -442,7 +442,7 @@
public void testSimpleBroadcastDifferentAddressAndPort() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
BroadcastGroup bg = new BroadcastGroupImpl(RandomUtil.randomString(),
@@ -465,7 +465,7 @@
bg.addConnectorPair(connectorPair);
final InetAddress groupAddress2 = InetAddress.getByName(DiscoveryTest.address2);
- final int port2 = 6746;
+ final int port2 = getUDPDiscoveryPort(1);
DiscoveryGroup dg = new DiscoveryGroupImpl(RandomUtil.randomString(),
RandomUtil.randomString(),
@@ -490,13 +490,13 @@
public void testMultipleGroups() throws Exception
{
final InetAddress groupAddress1 = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort1 = 6745;
+ final int groupPort1 = getUDPDiscoveryPort();
final InetAddress groupAddress2 = InetAddress.getByName(DiscoveryTest.address2);
- final int groupPort2 = 6746;
+ final int groupPort2 = getUDPDiscoveryPort(1);
final InetAddress groupAddress3 = InetAddress.getByName(DiscoveryTest.address3);
- final int groupPort3 = 6747;
+ final int groupPort3 = getUDPDiscoveryPort(2);
final int timeout = 500;
@@ -624,7 +624,7 @@
public void testBroadcastNullBackup() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -677,7 +677,7 @@
public void testDiscoveryListenersCalled() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -745,7 +745,7 @@
public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String node1 = RandomUtil.randomString();
@@ -1014,7 +1014,7 @@
public void testMultipleDiscoveryGroups() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
String nodeID = RandomUtil.randomString();
@@ -1105,7 +1105,7 @@
notifService.addNotificationListener(notifListener);
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
final int timeout = 500;
DiscoveryGroup dg = new DiscoveryGroupImpl(RandomUtil.randomString(),
@@ -1144,7 +1144,7 @@
notifService.addNotificationListener(notifListener);
final InetAddress groupAddress = InetAddress.getByName(DiscoveryTest.address1);
- final int groupPort = 6745;
+ final int groupPort = getUDPDiscoveryPort();
BroadcastGroup bg = new BroadcastGroupImpl(RandomUtil.randomString(),
RandomUtil.randomString(),
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -51,9 +51,9 @@
{
private static final Logger log =
Logger.getLogger(HornetQConnectionFactoryTest.class);
- private final String groupAddress = "230.1.2.3";
+ private final String groupAddress = getUDPDiscoveryAddress();
- private final int groupPort = 8765;
+ private final int groupPort = getUDPDiscoveryPort();
private HornetQServer liveService;
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlRestartTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -13,27 +13,36 @@
package org.hornetq.tests.integration.jms.server.management;
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Message;
import javax.jms.Queue;
+import javax.jms.QueueRequestor;
+import javax.jms.QueueSession;
+import javax.jms.Session;
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ObjectNameBuilder;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.JMSManagementHelper;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.jms.persistence.JMSStorageManager;
-import org.hornetq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
/**
* A JMSServerControlRestartTest
@@ -51,17 +60,15 @@
protected InVMContext context;
- private HornetQServer server;
+ private JMSServerManager serverManager;
- private JMSServerManagerImpl serverManager;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- public void testCreateDurableQueueAndRestartServer() throws Exception
+ public void testCreateDurableQueueUsingJMXAndRestartServer() throws Exception
{
String queueName = RandomUtil.randomString();
String binding = RandomUtil.randomString();
@@ -79,10 +86,52 @@
checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
serverManager.stop();
+
+ checkNoBinding(context, binding);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ serverManager = createJMSServer();
+ serverManager.start();
+
+ o = UnitTestCase.checkBinding(context, binding);
+ Assert.assertTrue(o instanceof Queue);
+ queue = (Queue)o;
+ assertEquals(queueName, queue.getQueueName());
+ checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ }
+
+ public void testCreateDurableQueueUsingJMSAndRestartServer() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+ String binding = RandomUtil.randomString();
+
+ UnitTestCase.checkNoBinding(context, binding);
+ checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+
+ TransportConfiguration config = new
TransportConfiguration(InVMConnectorFactory.class.getName());
+ Connection connection =
HornetQJMSClient.createConnectionFactory(config).createConnection();
+ connection.start();
+ Queue managementQueue =
HornetQJMSClient.createQueue("hornetq.management");
+ QueueSession session = (QueueSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ QueueRequestor requestor = new QueueRequestor(session, managementQueue);
+ Message message = session.createMessage();
+ JMSManagementHelper.putOperationInvocation(message, "jms.server",
"createQueue", queueName, binding);
+ Message reply = requestor.request(message);
+ assertTrue(JMSManagementHelper.hasOperationSucceeded(reply));
+ connection.close();
+
+ Object o = UnitTestCase.checkBinding(context, binding);
+ Assert.assertTrue(o instanceof Queue);
+ Queue queue = (Queue)o;
+ assertEquals(queueName, queue.getQueueName());
+ checkResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+
+ serverManager.stop();
+
checkNoBinding(context, binding);
checkNoResource(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(queueName));
+ serverManager = createJMSServer();
serverManager.start();
o = UnitTestCase.checkBinding(context, binding);
@@ -101,35 +150,35 @@
{
super.setUp();
+ serverManager = createJMSServer();
+ serverManager.start();
+ }
+
+ private JMSServerManager createJMSServer() throws Exception {
Configuration conf = new ConfigurationImpl();
conf.setSecurityEnabled(false);
conf.setJMXManagementEnabled(true);
+ conf.setPersistenceEnabled(true);
+ conf.setJournalType(JournalType.NIO);
conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ HornetQServer server = HornetQServers.newHornetQServer(conf, mbeanServer);
context = new InVMContext();
- JMSStorageManager storage = new JMSJournalStorageManagerImpl(new
TimeAndCounterIDGenerator(),
- server.getConfiguration(),
- server.getReplicationManager());
-
- serverManager = new JMSServerManagerImpl(server, null, storage);
+ serverManager = new JMSServerManagerImpl(server);
serverManager.setContext(context);
- serverManager.start();
- serverManager.activated();
+ return serverManager;
}
-
+
@Override
protected void tearDown() throws Exception
{
+ String bindingDir =
serverManager.getHornetQServer().getConfiguration().getBindingsDirectory();
serverManager.stop();
-
- server.stop();
-
serverManager = null;
+ System.out.println(bindingDir);
+ deleteDirectory(new File(bindingDir));
- server = null;
-
super.tearDown();
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -23,11 +23,13 @@
import junit.framework.Assert;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
+import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalCompactor;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalFileImpl;
@@ -38,7 +40,6 @@
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.TimeAndCounterIDGenerator;
/**
*
@@ -53,7 +54,7 @@
private static final int NUMBER_OF_RECORDS = 1000;
- IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+ IDGenerator idGenerator = new SimpleIDGenerator(100000);
// General tests
// =============
@@ -232,6 +233,119 @@
}
+ public void testCompactPrepareRestart() throws Exception
+ {
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ startCompact();
+
+ addTx(1, 2);
+
+ prepare(1, new SimpleEncoding(10, (byte)0));
+
+ finishCompact();
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ startCompact();
+
+ commit(1);
+
+ finishCompact();
+
+ journal.compact();
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+ }
+
+ public void testCompactPrepareRestart2() throws Exception
+ {
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ addTx(1, 2);
+
+ prepare(1, new SimpleEncoding(10, (byte)0));
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ startCompact();
+
+ commit(1);
+
+ finishCompact();
+
+ journal.compact();
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+ }
+
+ public void testCompactPrepareRestart3() throws Exception
+ {
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ addTx(1, 2, 3);
+
+ prepare(1, new SimpleEncoding(10, (byte)0));
+
+ startCompact();
+
+ commit(1);
+
+ finishCompact();
+
+ journal.compact();
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+ }
+
public void testOnRollback() throws Exception
{
@@ -277,13 +391,13 @@
journal.forceMoveNextFile();
addTx(1, 5, 6, 7, 8);
-
+
commit(1);
-
+
journal.forceMoveNextFile();
-
+
journal.compact();
-
+
add(10);
stopJournal();
@@ -806,7 +920,7 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
createJournal();
-
+
startJournal();
load();
@@ -931,7 +1045,7 @@
}
startCompact();
-
+
// Delete part of the live records while cleanup still working
for (int i = 1; i < 5; i++)
{
@@ -939,7 +1053,7 @@
}
finishCompact();
-
+
// Delete part of the live records after cleanup is done
for (int i = 5; i < 10; i++)
{
@@ -963,7 +1077,7 @@
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
+
createJournal();
startJournal();
@@ -978,7 +1092,7 @@
addTx(appendTX, appendOne);
startCompact();
-
+
addTx(appendTX, appendTwo);
commit(appendTX);
@@ -1161,7 +1275,7 @@
}
}
-
+
public void testCompactFirstFileWithPendingCommits() throws Exception
{
setup(2, 60 * 1024, true);
@@ -1175,10 +1289,165 @@
{
addTx(tx, idGenerator.generateID());
}
-
+
journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ if (i == 5)
+ {
+ commit(tx);
+ }
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when
cleanup was still in place
+ journal.compact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits3() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2,
this.fileSize, "/tmp/out1.dmp");
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2,
this.fileSize, "/tmp/out2.dmp");
+
+ rollback(tx);
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2,
this.fileSize, "/tmp/out3.dmp");
+
+ journal.forceMoveNextFile();
+ journal.checkReclaimStatus();
+
+ ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2,
this.fileSize, "/tmp/out4.dmp");
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits2() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx);
commit(tx);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits4() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+
+ long tx1 = idGenerator.generateID();
+ journal.forceMoveNextFile();
ArrayList<Long> listToDelete = new ArrayList<Long>();
for (int i = 0; i < 10; i++)
@@ -1187,21 +1456,157 @@
listToDelete.add(id);
add(id);
}
-
+
journal.forceMoveNextFile();
for (Long id : listToDelete)
{
delete(id);
}
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits5() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+
+ long tx1 = idGenerator.generateID();
journal.forceMoveNextFile();
-
- // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when
cleanup was still in place
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ startCompact();
+ System.out.println("Committing TX " + tx1);
+ rollback(tx0);
+ for (int i = 0 ; i < 10; i++)
+ {
+ addTx(tx1, ids[i]);
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx1);
+ finishCompact();
+
+ journal.checkReclaimStatus();
+
journal.compact();
- journal.checkReclaimStatus();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits6() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long ids[] = new long[10];
+
+ long tx0 = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ ids[i] = idGenerator.generateID();
+ addTx(tx0, ids[i]);
+ }
+ commit(tx0);
+
+ startCompact();
+ for (int i = 0 ; i < 10; i++)
+ {
+ delete(ids[i]);
+ }
+ finishCompact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testCompactFirstFileWithPendingCommits7() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx0 = idGenerator.generateID();
+ add(idGenerator.generateID());
+
+ long ids[] = new long[]{idGenerator.generateID(), idGenerator.generateID()};
+
+ addTx(tx0, ids[0]);
+ addTx(tx0, ids[1]);
+
+ journal.forceMoveNextFile();
+
+ commit(tx0);
+
+ journal.forceMoveNextFile();
+
+ delete(ids[0]);
+ delete(ids[1]);
+
+ journal.forceMoveNextFile();
+
journal.compact();
stopJournal();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -13,6 +13,10 @@
package org.hornetq.tests.integration.paging;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
@@ -58,7 +62,7 @@
AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(10 * 1024);
- defaultSetting.setMaxSizeBytes(100 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
@@ -155,6 +159,113 @@
}
}
+ public void testOrderOverTX() throws Exception
+ {
+ HornetQServer server = newHornetQServer();
+
+ server.start();
+
+ try
+ {
+ ClientSessionFactory sf;
+
+ if (isNetty())
+ {
+ sf = createNettyFactory();
+ }
+ else
+ {
+ sf = createInVMFactory();
+ }
+
+ ClientSession sessionConsumer = sf.createSession(true, true, 0);
+
+ sessionConsumer.createQueue(PagingSendTest.ADDRESS, PagingSendTest.ADDRESS,
null, true);
+
+ final ClientSession sessionProducer = sf.createSession(false, false);
+ final ClientProducer producer =
sessionProducer.createProducer(PagingSendTest.ADDRESS);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int TOTAL_MESSAGES = 1000;
+
+ // Consumer will be ready after we have commits
+ final CountDownLatch ready = new CountDownLatch(1);
+
+ Thread tProducer = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ int commit = 0;
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 100 == 0 && i > 0)
+ {
+ sessionProducer.commit();
+ if (commit++ > 2)
+ {
+ ready.countDown();
+ }
+ }
+ }
+
+ sessionProducer.commit();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+ ClientConsumer consumer =
sessionConsumer.createConsumer(PagingSendTest.ADDRESS);
+
+ sessionConsumer.start();
+
+ tProducer.start();
+
+ assertTrue(ready.await(10, TimeUnit.SECONDS));
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+
+ Assert.assertNotNull(msg);
+
+ assertEquals(i, msg.getIntProperty("count").intValue());
+
+ msg.acknowledge();
+ }
+
+ tProducer.join();
+
+ sessionConsumer.close();
+
+ sessionProducer.close();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -119,7 +119,7 @@
}
else
{
- factory = new NIOSequentialFileFactory(dir.getPath());
+ factory = new NIOSequentialFileFactory(dir.getPath(), true);
maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
}
@@ -134,9 +134,6 @@
{
protected void onCompactLock() throws Exception
{
- // System.out.println("OnCompactLock");
- journal.forceMoveNextFile(false);
- // System.out.println("OnCompactLock done");
}
protected void onCompactStart() throws Exception
@@ -152,7 +149,7 @@
{
long id = idGen.generateID();
journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 },
false);
- journal.forceMoveNextFile(false);
+ journal.forceMoveNextFile();
journal.appendDeleteRecord(id, id == 20);
}
// System.out.println("OnCompactStart leave");
@@ -283,6 +280,8 @@
journal.forceMoveNextFile();
+ journal.checkReclaimStatus();
+
Thread.sleep(5000);
assertEquals(0, journal.getDataFilesCount());
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -116,7 +116,11 @@
if (i % 2 == 0 && i > 0)
{
System.out.println("DataFiles = " + journal.getDataFilesCount());
+
journal.forceMoveNextFile();
+ journal.debugWait();
+ journal.checkReclaimStatus();
+
if (journal.getDataFilesCount() != 0)
{
System.out.println("DebugJournal:" + journal.debug());
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -463,8 +463,12 @@
{
AlignedJournalImplTest.log.debug("Expected exception " + e, e);
}
+
setupAndLoadJournal(JOURNAL_SIZE, 100);
+
+ journalImpl.forceMoveNextFile();
+ journalImpl.checkReclaimStatus();
Assert.assertEquals(0, records.size());
Assert.assertEquals(0, transactions.size());
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -125,22 +125,6 @@
Assert.assertEquals(nr1, trans2.getNumberOfMessages());
- for (int i = 0; i < nr1; i++)
- {
- trans.decrement();
- }
-
- Assert.assertEquals(0, trans.getNumberOfMessages());
-
- try
- {
- trans.decrement();
- Assert.fail("Exception expected!");
- }
- catch (Throwable ignored)
- {
- }
-
}
public void testDoubleStart() throws Exception
@@ -1355,6 +1339,20 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long,
org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction,
int depage) throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-09-10
15:23:04 UTC (rev 9667)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-09-10
17:44:13 UTC (rev 9668)
@@ -321,6 +321,33 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#isPaging()
+ */
+ public boolean isPaging()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#setPaging(boolean)
+ */
+ public void setPaging(boolean paging)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
+ */
+ public boolean isContainsPersistent()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
class FakeMessage implements ServerMessage
Modified: trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-09-10 15:23:04
UTC (rev 9667)
+++ trunk/tests/src/org/hornetq/tests/unit/util/ReusableLatchTest.java 2010-09-10 17:44:13
UTC (rev 9668)
@@ -30,6 +30,19 @@
{
private static final Logger log = Logger.getLogger(ReusableLatchTest.class);
+
+ public void testLatchWithParameterizedDown() throws Exception
+ {
+ ReusableLatch latch = new ReusableLatch(1000);
+
+ latch.countDown(5000);
+
+ assertTrue(latch.await(1000));
+
+
+ assertEquals(0, latch.getCount());
+ }
+
public void testLatchOnSingleThread() throws Exception
{
ReusableLatch latch = new ReusableLatch();
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-09-10 15:23:04 UTC (rev
9667)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-09-10 17:44:13 UTC (rev
9668)
@@ -96,8 +96,36 @@
private final String testDir = System.getProperty("java.io.tmpdir",
"/tmp") + "/hornetq-unit-test";
// Static --------------------------------------------------------
+
+
+ protected static String getUDPDiscoveryAddress()
+ {
+ return System.getProperty("TEST-UDP-ADDRESS", "230.10.20.1");
+ }
+
+ protected static String getUDPDiscoveryAddress(int variant)
+ {
+ String value = getUDPDiscoveryAddress();
+
+ int posPoint = value.lastIndexOf('.');
+
+ int last = Integer.valueOf( value.substring(posPoint + 1) );
+
+ return value.substring(0, posPoint + 1) + (last + variant);
+ }
+
+ public static int getUDPDiscoveryPort()
+ {
+ return Integer.parseInt(System.getProperty("TEST-UDP-PORT",
"6750"));
+ }
+ public static int getUDPDiscoveryPort(final int variant)
+ {
+ return getUDPDiscoveryPort() + 1;
+ }
+
+
protected static JournalType getDefaultJournalType()
{
if (AsynchronousFileImpl.isLoaded())