JBoss hornetq SVN: r12233 - tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-03-02 03:11:24 -0500 (Fri, 02 Mar 2012)
New Revision: 12233
Added:
tags/HornetQ_2_2_12_AS7_Final_Pending/
Log:
pending release for 2.2.12.Final
12 years, 2 months
JBoss hornetq SVN: r12232 - branches/Branch_2_2_AS7.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-03-02 03:08:08 -0500 (Fri, 02 Mar 2012)
New Revision: 12232
Modified:
branches/Branch_2_2_AS7/build-maven.xml
Log:
updated version
Modified: branches/Branch_2_2_AS7/build-maven.xml
===================================================================
--- branches/Branch_2_2_AS7/build-maven.xml 2012-03-02 01:11:53 UTC (rev 12231)
+++ branches/Branch_2_2_AS7/build-maven.xml 2012-03-02 08:08:08 UTC (rev 12232)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.11.Final"/>
+ <property name="hornetq.version" value="2.2.12.Final"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
12 years, 2 months
JBoss hornetq SVN: r12231 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-01 20:11:53 -0500 (Thu, 01 Mar 2012)
New Revision: 12231
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java
Log:
fixing test
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2012-03-01 23:41:09 UTC (rev 12230)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2012-03-02 01:11:53 UTC (rev 12231)
@@ -212,8 +212,6 @@
ClientSession session = sf.createSession();
session.start();
-
- session.createQueue("foo", "foo", true);
ClientProducer producer = session.createProducer("foo");
12 years, 2 months
JBoss hornetq SVN: r12230 - branches/Branch_2_2_AS7/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-03-01 18:41:09 -0500 (Thu, 01 Mar 2012)
New Revision: 12230
Modified:
branches/Branch_2_2_AS7/src/config/common/hornetq-version.properties
Log:
version change
Modified: branches/Branch_2_2_AS7/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_AS7/src/config/common/hornetq-version.properties 2012-03-01 20:09:08 UTC (rev 12229)
+++ branches/Branch_2_2_AS7/src/config/common/hornetq-version.properties 2012-03-01 23:41:09 UTC (rev 12230)
@@ -1,7 +1,7 @@
-hornetq.version.versionName=HQ_2_2_11_FINAL_AS7
+hornetq.version.versionName=HQ_2_2_12_FINAL_AS7
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
-hornetq.version.microVersion=11
+hornetq.version.microVersion=12
hornetq.version.incrementingVersion=122
hornetq.version.versionSuffix=Final
hornetq.version.versionTag=Final
12 years, 2 months
JBoss hornetq SVN: r12229 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-03-01 15:09:08 -0500 (Thu, 01 Mar 2012)
New Revision: 12229
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
JBPAPP-8174 - duh
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2012-03-01 20:08:46 UTC (rev 12228)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2012-03-01 20:09:08 UTC (rev 12229)
@@ -531,7 +531,10 @@
}
}
- bridge.flushExecutor();
+ if (bridge!= null)
+ {
+ bridge.flushExecutor();
+ }
}
// for testing
12 years, 2 months
JBoss hornetq SVN: r12228 - branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-03-01 15:08:46 -0500 (Thu, 01 Mar 2012)
New Revision: 12228
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
JBPAPP-8174 - duh
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2012-03-01 19:59:23 UTC (rev 12227)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2012-03-01 20:08:46 UTC (rev 12228)
@@ -531,7 +531,10 @@
}
}
- bridge.flushExecutor();
+ if (bridge!= null)
+ {
+ bridge.flushExecutor();
+ }
}
// for testing
12 years, 2 months
JBoss hornetq SVN: r12227 - in trunk/hornetq-core/src/main/java/org/hornetq: utils and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-01 14:59:23 -0500 (Thu, 01 Mar 2012)
New Revision: 12227
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/utils/LinkedListImpl.java
Log:
JBPAPP-8282 - merge on trunk
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2012-03-01 19:58:56 UTC (rev 12226)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2012-03-01 19:59:23 UTC (rev 12227)
@@ -252,19 +252,22 @@
}
return HandleStatus.BUSY;
}
-
- if (log.isTraceEnabled())
- {
- log.trace("Handling reference " + ref);
- }
-
final ServerMessage message = ref.getMessage();
if (filter != null && !filter.match(message))
{
+ if (log.isTraceEnabled())
+ {
+ log.trace("Reference " + ref + " is a noMatch on consumer " + this);
+ }
return HandleStatus.NO_MATCH;
}
+ if (log.isTraceEnabled())
+ {
+ log.trace("Handling reference " + ref);
+ }
+
if (!browseOnly)
{
if (!preAcknowledge)
Modified: trunk/hornetq-core/src/main/java/org/hornetq/utils/LinkedListImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/utils/LinkedListImpl.java 2012-03-01 19:58:56 UTC (rev 12226)
+++ trunk/hornetq-core/src/main/java/org/hornetq/utils/LinkedListImpl.java 2012-03-01 19:59:23 UTC (rev 12227)
@@ -34,7 +34,7 @@
private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
- private Node<E> head = new Node<E>(null);
+ private final Node<E> head = new Node<E>(null);
private Node<E> tail = null;
@@ -55,7 +55,7 @@
public void addHead(E e)
{
Node<E> node = new Node<E>(e);
-
+
node.next = head.next;
node.prev = head;
@@ -66,6 +66,11 @@
{
tail = node;
}
+ else
+ {
+ // Need to set the previous element on the former head
+ node.next.prev = node;
+ }
size++;
}
12 years, 2 months
JBoss hornetq SVN: r12226 - in trunk/tests: unit-tests/src/test/java/org/hornetq/tests/unit/util and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-01 14:58:56 -0500 (Thu, 01 Mar 2012)
New Revision: 12226
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java
Log:
JBPAPP-8282 - merge on trunk
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2012-03-01 16:04:29 UTC (rev 12225)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerFilterTest.java 2012-03-01 19:58:56 UTC (rev 12226)
@@ -202,4 +202,86 @@
session.close();
}
+
+ public void testLinkedListOrder() throws Exception
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession();
+
+ session.start();
+
+ session.createQueue("foo", "foo", true);
+
+ ClientProducer producer = session.createProducer("foo");
+
+ ClientConsumer redConsumer = session.createConsumer("foo", "color='red'");
+
+ ClientConsumer anyConsumer = session.createConsumer("foo");
+
+ sendMessage(session, producer, "any", "msg1");
+
+ sendMessage(session, producer, "any", "msg2");
+
+ sendMessage(session, producer, "any", "msg3");
+
+ sendMessage(session, producer, "red", "msgRed4");
+
+ sendMessage(session, producer, "red", "msgRed5");
+
+ readConsumer("anyConsumer", anyConsumer);
+
+ readConsumer("anyConsumer", anyConsumer);
+
+ log.info("### closing consumer ###");
+
+ anyConsumer.close();
+
+ readConsumer("redConsumer", redConsumer);
+
+ readConsumer("redConsumer", redConsumer);
+
+ log.info("### recreating consumer ###");
+
+ anyConsumer = session.createConsumer("foo");
+
+ session.start();
+
+ readConsumer("anyConsumer", anyConsumer);
+
+ session.close();
+
+ sf.close();
+
+ locator.close();
+ }
+
+ /**
+ * @param consumer
+ * @throws HornetQException
+ */
+ private void readConsumer(String consumerName, ClientConsumer consumer) throws Exception
+ {
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+ System.out.println("consumer = " + consumerName + " message, color=" + message.getStringProperty("color") + ", msg = " + message.getStringProperty("value"));
+ message.acknowledge();
+ }
+
+ /**
+ * @param session
+ * @param producer
+ * @return
+ * @throws HornetQException
+ */
+ private void sendMessage(ClientSession session, ClientProducer producer, String color, String msg) throws Exception
+ {
+ ClientMessage anyMessage = session.createMessage(true);
+ anyMessage.putStringProperty("color", color);
+ anyMessage.putStringProperty("value", msg);
+ producer.send(anyMessage);
+ session.commit();
+ }
}
Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java 2012-03-01 16:04:29 UTC (rev 12225)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/LinkedListTest.java 2012-03-01 19:58:56 UTC (rev 12226)
@@ -110,6 +110,76 @@
}
+ public void testAddHeadAndRemove()
+ {
+ final AtomicInteger count = new AtomicInteger(0);
+ class MyObject
+ {
+
+ public int payload;
+
+ MyObject(int payloadcount)
+ {
+ count.incrementAndGet();
+ this.payload = payloadcount;
+ }
+
+ protected void finalize() throws Exception
+ {
+ count.decrementAndGet();
+ }
+
+ public String toString()
+ {
+ return "" + payload;
+ }
+ };
+
+ LinkedListImpl<MyObject> objs = new LinkedListImpl<MyObject>();
+
+ // Initial add
+ for (int i = 1000; i >= 0; i--)
+ {
+ objs.addHead(new MyObject(i));
+ }
+ assertCount(1001, count);
+
+ LinkedListIterator<MyObject> iter = objs.iterator();
+
+ int countLoop = 0;
+ for (countLoop = 0 ; countLoop <= 1000; countLoop++)
+ {
+ MyObject obj = iter.next();
+ assertEquals(countLoop, obj.payload);
+ if (countLoop == 500 || countLoop == 1000)
+ {
+ iter.remove();
+ }
+ }
+
+ iter.close();
+
+ iter = objs.iterator();
+
+ countLoop = 0;
+ while (iter.hasNext())
+ {
+ if (countLoop == 500 || countLoop == 1000)
+ {
+ System.out.println("Jumping " + countLoop);
+ countLoop++;
+ }
+ MyObject obj = iter.next();
+ assertEquals(countLoop, obj.payload);
+ countLoop++;
+ }
+
+
+
+ assertCount(999, count);
+
+ }
+
/**
* @param count
*/
12 years, 2 months
JBoss hornetq SVN: r12225 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2012-03-01 11:04:29 -0500 (Thu, 01 Mar 2012)
New Revision: 12225
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java
Log:
Adding unit-test on JBPAPP-8282
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java 2012-03-01 16:04:23 UTC (rev 12224)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/LinkedListTest.java 2012-03-01 16:04:29 UTC (rev 12225)
@@ -110,6 +110,76 @@
}
+ public void testAddHeadAndRemove()
+ {
+ final AtomicInteger count = new AtomicInteger(0);
+ class MyObject
+ {
+
+ public int payload;
+
+ MyObject(int payloadcount)
+ {
+ count.incrementAndGet();
+ this.payload = payloadcount;
+ }
+
+ protected void finalize() throws Exception
+ {
+ count.decrementAndGet();
+ }
+
+ public String toString()
+ {
+ return "" + payload;
+ }
+ };
+
+ LinkedListImpl<MyObject> objs = new LinkedListImpl<MyObject>();
+
+ // Initial add
+ for (int i = 1000; i >= 0; i--)
+ {
+ objs.addHead(new MyObject(i));
+ }
+ assertCount(1001, count);
+
+ LinkedListIterator<MyObject> iter = objs.iterator();
+
+ int countLoop = 0;
+ for (countLoop = 0 ; countLoop <= 1000; countLoop++)
+ {
+ MyObject obj = iter.next();
+ assertEquals(countLoop, obj.payload);
+ if (countLoop == 500 || countLoop == 1000)
+ {
+ iter.remove();
+ }
+ }
+
+ iter.close();
+
+ iter = objs.iterator();
+
+ countLoop = 0;
+ while (iter.hasNext())
+ {
+ if (countLoop == 500 || countLoop == 1000)
+ {
+ System.out.println("Jumping " + countLoop);
+ countLoop++;
+ }
+ MyObject obj = iter.next();
+ assertEquals(countLoop, obj.payload);
+ countLoop++;
+ }
+
+
+
+ assertCount(999, count);
+
+ }
+
/**
* @param count
*/
12 years, 2 months
JBoss hornetq SVN: r12224 - branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-03-01 11:04:23 -0500 (Thu, 01 Mar 2012)
New Revision: 12224
Added:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrettyPrintHandler.java
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java
Modified:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
[HORNETQ-787] Initial commit of export functionality
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-03-01 16:04:10 UTC (rev 12223)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-03-01 16:04:23 UTC (rev 12224)
@@ -13,52 +13,12 @@
package org.hornetq.core.persistence.impl.journal;
-import java.io.File;
-import java.io.PrintStream;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-
-import javax.transaction.xa.Xid;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.*;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.IOCriticalErrorListener;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.TransactionFailureCallback;
-import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.JournalReaderCallback;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.journal.*;
+import org.hornetq.core.journal.impl.*;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -82,11 +42,8 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.impl.ReplicatedJournal;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.*;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.transaction.ResourceManager;
@@ -95,20 +52,27 @@
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.Base64;
-import org.hornetq.utils.DataConstants;
-import org.hornetq.utils.ExecutorFactory;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.XidCodecSupport;
+import org.hornetq.utils.*;
+import javax.transaction.xa.Xid;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.File;
+import java.io.PrintStream;
+import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.*;
+import java.util.LinkedList;
+import java.util.concurrent.*;
+
/**
- *
* A JournalStorageManager
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
*/
public class JournalStorageManager implements StorageManager
{
@@ -158,7 +122,7 @@
public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
-
+
private final Semaphore pageMaxConcurrentIO;
private final BatchingIDGenerator idGenerator;
@@ -170,12 +134,14 @@
private final Journal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
-
+
private SequentialFileFactory journalFF = null;
private volatile boolean started;
- /** Used to create Operation Contexts */
+ /**
+ * Used to create Operation Contexts
+ */
private final ExecutorFactory executorFactory;
private final Executor executor;
@@ -247,17 +213,17 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, criticalErrorListener);
Journal localBindings = new JournalImpl(1024 * 1024,
- 2,
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- bindingsFF,
- "hornetq-bindings",
- "bindings",
- 1);
+ 2,
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ bindingsFF,
+ "hornetq-bindings",
+ "bindings",
+ 1);
if (replicator != null)
{
- bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
+ bindingsJournal = new ReplicatedJournal((byte) 0, localBindings, replicator);
}
else
{
@@ -280,20 +246,20 @@
JournalStorageManager.log.info("Using AIO Journal");
journalFF = new AIOSequentialFileFactory(journalDir,
- config.getJournalBufferSize_AIO(),
- config.getJournalBufferTimeout_AIO(),
- config.isLogJournalWriteRate(),
- criticalErrorListener);
+ config.getJournalBufferSize_AIO(),
+ config.getJournalBufferTimeout_AIO(),
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else if (config.getJournalType() == JournalType.NIO)
{
JournalStorageManager.log.info("Using NIO Journal");
journalFF = new NIOSequentialFileFactory(journalDir,
- true,
- config.getJournalBufferSize_NIO(),
- config.getJournalBufferTimeout_NIO(),
- config.isLogJournalWriteRate(),
- criticalErrorListener);
+ true,
+ config.getJournalBufferSize_NIO(),
+ config.getJournalBufferTimeout_NIO(),
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else
{
@@ -309,18 +275,18 @@
idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
- config.getJournalMinFiles(),
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- journalFF,
- "hornetq-data",
- "hq",
- config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
- : config.getJournalMaxIO_NIO());
+ config.getJournalMinFiles(),
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ journalFF,
+ "hornetq-data",
+ "hq",
+ config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
+ : config.getJournalMaxIO_NIO());
if (replicator != null)
{
- messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+ messageJournal = new ReplicatedJournal((byte) 1, localMessage, replicator);
}
else
{
@@ -332,7 +298,7 @@
largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
perfBlastPages = config.getJournalPerfBlastPages();
-
+
if (config.getPageMaxConcurrentIO() != 1)
{
pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO());
@@ -483,7 +449,7 @@
replicator.largeMessageBegin(id);
}
- LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
+ LargeServerMessageImpl largeMessage = (LargeServerMessageImpl) createLargeMessage();
largeMessage.copyHeadersAndProperties(message);
@@ -507,10 +473,10 @@
long recordID = generateUniqueID();
messageJournal.appendAddRecord(recordID,
- ADD_LARGE_MESSAGE_PENDING,
- new PendingLargeMessageEncoding(messageID),
- true,
- getContext(true));
+ ADD_LARGE_MESSAGE_PENDING,
+ new PendingLargeMessageEncoding(messageID),
+ true,
+ getContext(true));
return recordID;
}
@@ -519,11 +485,13 @@
{
installLargeMessageConfirmationOnTX(tx, recordID);
messageJournal.appendDeleteRecordTransactional(tx.getID(),
- recordID,
- new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ recordID,
+ new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
}
- /** We don't need messageID now but we are likely to need it we ever decide to support a database */
+ /**
+ * We don't need messageID now but we are likely to need it we ever decide to support a database
+ */
public void confirmPendingLargeMessage(long recordID) throws Exception
{
messageJournal.appendDeleteRecord(recordID, true, getContext());
@@ -542,18 +510,18 @@
if (message.isLargeMessage())
{
messageJournal.appendAddRecord(message.getMessageID(),
- JournalStorageManager.ADD_LARGE_MESSAGE,
- new LargeMessageEncoding((LargeServerMessage)message),
- false,
- getContext(false));
+ JournalStorageManager.ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding((LargeServerMessage) message),
+ false,
+ getContext(false));
}
else
{
messageJournal.appendAddRecord(message.getMessageID(),
- JournalStorageManager.ADD_MESSAGE,
- message,
- false,
- getContext(false));
+ JournalStorageManager.ADD_MESSAGE,
+ message,
+ false,
+ getContext(false));
}
}
@@ -561,19 +529,19 @@
{
messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ADD_REF,
- new RefEncoding(queueID),
- last && syncNonTransactional,
- getContext(last && syncNonTransactional));
+ JournalStorageManager.ADD_REF,
+ new RefEncoding(queueID),
+ last && syncNonTransactional,
+ getContext(last && syncNonTransactional));
}
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID),
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.ACKNOWLEDGE_REF,
+ new RefEncoding(queueID),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
@@ -581,10 +549,10 @@
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecord(ackID,
- ACKNOWLEDGE_CURSOR,
- new CursorAckRecordEncoding(queueID, position),
- syncNonTransactional,
- getContext(syncNonTransactional));
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteMessage(final long messageID) throws Exception
@@ -599,13 +567,13 @@
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
- .getID());
+ .getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
- encoding,
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
+ encoding,
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
@@ -613,10 +581,10 @@
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
messageJournal.appendAddRecord(recordID,
- JournalStorageManager.DUPLICATE_ID,
- encoding,
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.DUPLICATE_ID,
+ encoding,
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteDuplicateID(final long recordID) throws Exception
@@ -636,16 +604,16 @@
if (message.isLargeMessage())
{
messageJournal.appendAddRecordTransactional(txID,
- message.getMessageID(),
- JournalStorageManager.ADD_LARGE_MESSAGE,
- new LargeMessageEncoding(((LargeServerMessage)message)));
+ message.getMessageID(),
+ JournalStorageManager.ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding(((LargeServerMessage) message)));
}
else
{
messageJournal.appendAddRecordTransactional(txID,
- message.getMessageID(),
- JournalStorageManager.ADD_MESSAGE,
- message);
+ message.getMessageID(),
+ JournalStorageManager.ADD_MESSAGE,
+ message);
}
}
@@ -655,43 +623,43 @@
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
- pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- pageTransaction);
+ pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ pageTransaction);
}
public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
- pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
- depages));
+ pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages));
}
public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
- JournalStorageManager.ADD_REF,
- new RefEncoding(queueID));
+ messageID,
+ JournalStorageManager.ADD_REF,
+ new RefEncoding(queueID));
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
- JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID));
+ messageID,
+ JournalStorageManager.ACKNOWLEDGE_REF,
+ new RefEncoding(queueID));
}
/* (non-Javadoc)
@@ -702,9 +670,9 @@
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecordTransactional(txID,
- ackID,
- ACKNOWLEDGE_CURSOR,
- new CursorAckRecordEncoding(queueID, position));
+ ackID,
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position));
}
/* (non-Javadoc)
@@ -720,10 +688,10 @@
long id = generateUniqueID();
messageJournal.appendAddRecord(id,
- JournalStorageManager.HEURISTIC_COMPLETION,
- new HeuristicCompletionEncoding(xid, isCommit),
- true,
- getContext(true));
+ JournalStorageManager.HEURISTIC_COMPLETION,
+ new HeuristicCompletionEncoding(xid, isCommit),
+ true,
+ getContext(true));
return id;
}
@@ -740,12 +708,12 @@
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
- .getID());
+ .getID());
messageJournal.appendUpdateRecordTransactional(txID,
- ref.getMessage().getMessageID(),
- JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
- encoding);
+ ref.getMessage().getMessageID(),
+ JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
+ encoding);
}
public void prepare(final long txID, final Xid xid) throws Exception
@@ -762,7 +730,7 @@
{
bindingsJournal.appendCommitRecord(txID, true);
}
-
+
public void rollbackBindings(final long txID) throws Exception
{
// no need to sync, it's going away anyways
@@ -820,14 +788,14 @@
{
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
- ref.getDeliveryCount());
+ ref.getDeliveryCount());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.UPDATE_DELIVERY_COUNT,
- updateInfo,
+ JournalStorageManager.UPDATE_DELIVERY_COUNT,
+ updateInfo,
- syncNonTransactional,
- getContext(syncNonTransactional));
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
}
@@ -905,8 +873,8 @@
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
JournalLoadInformation info = messageJournal.load(records,
- preparedTransactions,
- new LargeMessageTXFailureCallback(messages));
+ preparedTransactions,
+ new LargeMessageTXFailureCallback(messages));
ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
@@ -921,7 +889,7 @@
// It will show log.info only with large journals (more than 1 million records)
if (reccount > 0 && reccount % 1000000 == 0)
{
- long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
+ long percent = (long) ((((double) reccount) / ((double) totalSize)) * 100f);
log.info(percent + "% loaded");
}
@@ -1011,8 +979,8 @@
if (queueMessages == null)
{
log.error("Cannot find queue messages for queueID=" + encoding.queueID +
- " on ack for messageID=" +
- messageID);
+ " on ack for messageID=" +
+ messageID);
}
else
{
@@ -1094,9 +1062,9 @@
if (queueMessages == null)
{
log.error("Cannot find queue messages " + encoding.queueID +
- " for message " +
- messageID +
- " while processing scheduled messages");
+ " for message " +
+ messageID +
+ " while processing scheduled messages");
}
else
{
@@ -1158,7 +1126,7 @@
{
log.info("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR, deleting record now");
messageJournal.appendDeleteRecord(record.id, false);
-
+
}
break;
@@ -1271,14 +1239,14 @@
}
loadPreparedTransactions(postOffice,
- pagingManager,
- resourceManager,
- queues,
- queueInfos,
- preparedTransactions,
- duplicateIDMap,
- pageSubscriptions,
- pendingLargeMessages);
+ pagingManager,
+ resourceManager,
+ queues,
+ queueInfos,
+ preparedTransactions,
+ duplicateIDMap,
+ pageSubscriptions,
+ pendingLargeMessages);
for (PageSubscription sub : pageSubscriptions.values())
{
@@ -1290,7 +1258,7 @@
if (msg.getRefCount() == 0)
{
JournalStorageManager.log.info("Large message: " + msg.getMessageID() +
- " didn't have any associated reference, file will be deleted");
+ " didn't have any associated reference, file will be deleted");
msg.decrementDelayDeletionCount();
}
}
@@ -1302,7 +1270,7 @@
log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
try
{
- deleteMessage(msg.getMessageID());
+ deleteMessage(msg.getMessageID());
}
catch (Exception ignored)
{
@@ -1370,8 +1338,8 @@
public void addGrouping(final GroupBinding groupBinding) throws Exception
{
GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
- groupBinding.getGroupId(),
- groupBinding.getClusterName());
+ groupBinding.getGroupId(),
+ groupBinding.getClusterName());
bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
}
@@ -1384,19 +1352,19 @@
public void addQueueBinding(final long tx, final Binding binding) throws Exception
{
- Queue queue = (Queue)binding.getBindable();
+ Queue queue = (Queue) binding.getBindable();
Filter filter = queue.getFilter();
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
- binding.getAddress(),
- filterString);
+ binding.getAddress(),
+ filterString);
bindingsJournal.appendAddRecordTransactional(tx, binding.getID(),
- JournalStorageManager.QUEUE_BINDING_RECORD,
- bindingEncoding);
+ JournalStorageManager.QUEUE_BINDING_RECORD,
+ bindingEncoding);
}
public void deleteQueueBinding(final long queueBindingID) throws Exception
@@ -1411,9 +1379,9 @@
{
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
- recordID,
- JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
- new PageCountRecordInc(queueID, value));
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value));
return recordID;
}
@@ -1424,10 +1392,10 @@
{
long recordID = idGenerator.generateID();
messageJournal.appendAddRecord(recordID,
- JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
- new PageCountRecordInc(queueID, value),
- true,
- getContext());
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value),
+ true,
+ getContext());
return recordID;
}
@@ -1438,9 +1406,9 @@
{
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
- recordID,
- JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
- new PageCountRecord(queueID, value));
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+ new PageCountRecord(queueID, value));
return recordID;
}
@@ -1479,13 +1447,13 @@
ConfigurationImpl defaultValues = new ConfigurationImpl();
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
- defaultValues.getJournalMinFiles(),
- 0,
- 0,
- messagesFF,
- "hornetq-data",
- "hq",
- 1);
+ defaultValues.getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
describeJournal(messagesFF, messagesJournal);
}
@@ -1568,8 +1536,8 @@
cleanupIncompleteFiles();
singleThreadExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-IO-SingleThread",
- true,
- getThisClassLoader()));
+ true,
+ getThisClassLoader()));
bindingsJournal.start();
@@ -1578,7 +1546,7 @@
started = true;
}
- public void stop() throws Exception
+ public void stop() throws Exception
{
stop(false);
}
@@ -1762,9 +1730,9 @@
{
// for compatibility: couple with old behaviour, copying the old file to avoid message loss
long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
+
SequentialFile currentFile = createFileForLargeMessage(largeMessage.getMessageID(), true);
-
+
if (!currentFile.exists())
{
SequentialFile linkedFile = createFileForLargeMessage(originalMessageID, true);
@@ -1774,7 +1742,7 @@
linkedFile.close();
}
}
-
+
currentFile.close();
}
@@ -1847,7 +1815,7 @@
if (queue == null)
{
log.warn("Message in prepared tx for queue " + encoding.queueID +
- " which does not exist. This message will be ignored.");
+ " which does not exist. This message will be ignored.");
}
else
@@ -1946,9 +1914,9 @@
encoding.position.setRecordID(record.id);
PageSubscription sub = locateSubscription(encoding.queueID,
- pageSubscriptions,
- queueInfos,
- pagingManager);
+ pageSubscriptions,
+ queueInfos,
+ pagingManager);
if (sub != null)
{
@@ -1975,9 +1943,9 @@
encoding.decode(buff);
PageSubscription sub = locateSubscription(encoding.queueID,
- pageSubscriptions,
- queueInfos,
- pagingManager);
+ pageSubscriptions,
+ queueInfos,
+ pagingManager);
if (sub != null)
{
@@ -1994,7 +1962,7 @@
default:
{
JournalStorageManager.log.warn("InternalError: Record type " + recordType +
- " not recognized. Maybe you're using journal files created on a different version");
+ " not recognized. Maybe you're using journal files created on a different version");
}
}
}
@@ -2170,7 +2138,9 @@
}
- /** It's public as other classes may want to unparse data on tools*/
+ /**
+ * It's public as other classes may want to unparse data on tools
+ */
public static class XidEncoding implements EncodingSupport
{
public final Xid xid;
@@ -2331,13 +2301,13 @@
public String toString()
{
return "PersistentQueueBindingEncoding [id=" + id +
- ", name=" +
- name +
- ", address=" +
- address +
- ", filterString=" +
- filterString +
- "]";
+ ", name=" +
+ name +
+ ", address=" +
+ address +
+ ", filterString=" +
+ filterString +
+ "]";
}
public PersistentQueueBindingEncoding(final SimpleString name,
@@ -2391,7 +2361,7 @@
public int getEncodeSize()
{
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
- SimpleString.sizeofNullableString(filterString);
+ SimpleString.sizeofNullableString(filterString);
}
}
@@ -2786,7 +2756,7 @@
// transaction until all the messages were added to the queue
// or else we could deliver the messages out of order
- PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (pageTransaction != null)
{
@@ -2800,7 +2770,7 @@
public void afterRollback(final Transaction tx)
{
- PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (tx.getState() == State.PREPARED && pageTransaction != null)
{
@@ -3047,12 +3017,12 @@
private static String describeRecord(RecordInfo info)
{
return "recordID=" + info.id +
- ";userRecordType=" +
- info.userRecordType +
- ";isUpdate=" +
- info.isUpdate +
- ";" +
- newObjectEncoding(info);
+ ";userRecordType=" +
+ info.userRecordType +
+ ";isUpdate=" +
+ info.isUpdate +
+ ";" +
+ newObjectEncoding(info);
}
private static String describeRecord(RecordInfo info, Object o)
@@ -3268,7 +3238,7 @@
Object value = msg.getObjectProperty(prop);
if (value instanceof byte[])
{
- buffer.append(prop + "=" + Arrays.toString((byte[])value) + ",");
+ buffer.append(prop + "=" + Arrays.toString((byte[]) value) + ",");
}
else
@@ -3383,10 +3353,10 @@
public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
out.println("operation@Prepare,txID=" + transactionID +
- ",numberOfRecords=" +
- numberOfRecords +
- ",extraData=" +
- encode(extraData));
+ ",numberOfRecords=" +
+ numberOfRecords +
+ ",extraData=" +
+ encode(extraData));
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
@@ -3463,7 +3433,7 @@
}
else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
{
- ReferenceDescribe ref = (ReferenceDescribe)o;
+ ReferenceDescribe ref = (ReferenceDescribe) o;
Integer count = messageRefCounts.get(ref.refEncoding.queueID);
if (count == null)
{
@@ -3477,7 +3447,7 @@
}
else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
{
- AckDescribe ref = (AckDescribe)o;
+ AckDescribe ref = (AckDescribe) o;
Integer count = messageRefCounts.get(ref.refEncoding.queueID);
if (count == null)
{
@@ -3507,7 +3477,7 @@
}
else if (info.getUserRecordType() == 32)
{
- ReferenceDescribe ref = (ReferenceDescribe)o;
+ ReferenceDescribe ref = (ReferenceDescribe) o;
Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
if (count == null)
{
@@ -3555,9 +3525,272 @@
journal.stop();
}
+ /**
+ * @param bindingsDir
+ * @param messagesDir
+ * @throws Exception
+ */
+ public static void describeJournalAsXML(String bindingsDir, String messagesDir) throws Exception
+ {
+ // map of message refs hashed by the queue ID to which they belong and then hashed by their ID
+ Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
+
+ // map of all message records hashed by their ID (which will match the ID of the message refs)
+ HashMap<Long, Message> messages = new HashMap<Long, Message>();
+
+ processMessageRecords(messagesDir, messageRefs, messages);
+ HashMap<Long, PersistentQueueBindingEncoding> bindings = getBindings(bindingsDir);
+
+ printXML(bindings, messageRefs, messages);
+ }
+
+ private static void processMessageRecords(String messagesDir, Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs, HashMap<Long, Message> messages) throws Exception
+ {
+ ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
+
+ JournalImpl journal = openMessageJournal(messagesDir);
+
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ journal.start();
+
+ journal.load(records, null, null, false);
+
+ for (RecordInfo info : records)
+ {
+ Object o = newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
+ {
+ messages.put(info.id, ((MessageDescribe) o).msg);
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe) o;
+ HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
+ if (map == null)
+ {
+ HashMap<Long, ReferenceDescribe> newMap = new HashMap<Long, ReferenceDescribe>();
+ newMap.put(ref.refEncoding.queueID, ref);
+ messageRefs.put(info.id, newMap);
+ }
+ else
+ {
+ map.put(ref.refEncoding.queueID, ref);
+ }
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+ {
+ acks.add(info);
+ }
+ }
+
+ journal.stop();
+
+ removeAcked(messageRefs, messages, acks);
+ }
+
+ private static void removeAcked(Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs, HashMap<Long, Message> messages, ArrayList<RecordInfo> acks)
+ {
+ for (RecordInfo info : acks)
+ {
+ AckDescribe ack = (AckDescribe) newObjectEncoding(info);
+ HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
+ referenceDescribeHashMap.remove(ack.refEncoding.queueID);
+ if (referenceDescribeHashMap.size() == 0)
+ {
+ messages.remove(info.id);
+ messageRefs.remove(info.id);
+ }
+ }
+ }
+
+ private static void printXML(HashMap<Long, PersistentQueueBindingEncoding> queueBindings, Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs, HashMap<Long, Message> messages)
+ {
+ try
+ {
+ XMLOutputFactory factory = XMLOutputFactory.newInstance();
+ XMLStreamWriter writer = factory.createXMLStreamWriter(System.out);
+ PrettyPrintHandler handler = new PrettyPrintHandler(writer);
+ XMLStreamWriter prettyWriter = (XMLStreamWriter) Proxy.newProxyInstance(
+ XMLStreamWriter.class.getClassLoader(),
+ new Class[]{XMLStreamWriter.class},
+ handler);
+
+ prettyWriter.writeStartDocument("1.0");
+ prettyWriter.writeStartElement("hornetq-journal");
+ prettyWriter.writeStartElement("bindings");
+ for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet())
+ {
+ PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
+ prettyWriter.writeEmptyElement("binding");
+ prettyWriter.writeAttribute("address", bindingEncoding.getAddress().toString());
+ String filter = "";
+ if (bindingEncoding.getFilterString() != null)
+ {
+ filter = bindingEncoding.getFilterString().toString();
+ }
+ prettyWriter.writeAttribute("filter-string", filter);
+ prettyWriter.writeAttribute("queue-name", bindingEncoding.getQueueName().toString());
+ prettyWriter.writeAttribute("id", new Long(bindingEncoding.getId()).toString());
+ }
+
+ prettyWriter.writeEndElement(); // end "bindings"
+ prettyWriter.flush();
+ prettyWriter.writeStartElement("messages");
+
+ for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
+ {
+ Message message = messageMapEntry.getValue();
+ prettyWriter.writeStartElement("message");
+ prettyWriter.writeAttribute("id", Long.toString(message.getMessageID()));
+ prettyWriter.writeAttribute("priority", Byte.toString(message.getPriority()));
+ prettyWriter.writeAttribute("expiration", Long.toString(message.getExpiration()));
+ prettyWriter.writeAttribute("timestamp", Long.toString(message.getTimestamp()));
+ String type = "default";
+ if (message.getType() == Message.BYTES_TYPE)
+ {
+ type = "bytes";
+ }
+ else if (message.getType() == Message.MAP_TYPE)
+ {
+ type = "map";
+ }
+ else if (message.getType() == Message.OBJECT_TYPE)
+ {
+ type = "object";
+ }
+ else if (message.getType() == Message.STREAM_TYPE)
+ {
+ type = "stream";
+ }
+ else if (message.getType() == Message.TEXT_TYPE)
+ {
+ type = "text";
+ }
+ prettyWriter.writeAttribute("type", type);
+ prettyWriter.writeAttribute("user-id", message.getUserID().toString());
+ prettyWriter.writeStartElement("body");
+ prettyWriter.writeCharacters(encode(message.getBodyBuffer().toByteBuffer().array()));
+ prettyWriter.writeEndElement();
+ prettyWriter.writeStartElement("properties");
+ for (SimpleString key : message.getPropertyNames())
+ {
+ Object value = message.getObjectProperty(key);
+ prettyWriter.writeEmptyElement("property");
+ prettyWriter.writeAttribute("name", key.toString());
+ prettyWriter.writeAttribute("value", value.toString());
+ if (value instanceof Boolean)
+ {
+ prettyWriter.writeAttribute("type", "boolean");
+ }
+ else if (value instanceof Byte)
+ {
+ prettyWriter.writeAttribute("type", "byte");
+ }
+ else if (value instanceof Short)
+ {
+ prettyWriter.writeAttribute("type", "short");
+ }
+ else if (value instanceof Integer)
+ {
+ prettyWriter.writeAttribute("type", "integer");
+ }
+ else if (value instanceof Long)
+ {
+ prettyWriter.writeAttribute("type", "long");
+ }
+ else if (value instanceof Float)
+ {
+ prettyWriter.writeAttribute("type", "float");
+ }
+ else if (value instanceof Double)
+ {
+ prettyWriter.writeAttribute("type", "double");
+ }
+ else if (value instanceof String)
+ {
+ prettyWriter.writeAttribute("type", "string");
+ }
+ else if (value instanceof SimpleString)
+ {
+ prettyWriter.writeAttribute("type", "simple-string");
+ }
+ }
+ prettyWriter.writeEndElement(); // end "properties"
+ prettyWriter.writeStartElement("queues");
+ HashMap<Long, ReferenceDescribe> refMap = messageRefs.get(messageMapEntry.getKey());
+ for (Map.Entry<Long, ReferenceDescribe> refMapEntry : refMap.entrySet())
+ {
+ prettyWriter.writeEmptyElement("queue");
+ prettyWriter.writeAttribute("name", queueBindings.get(refMapEntry.getValue().refEncoding.queueID).getQueueName().toString());
+ }
+
+ prettyWriter.writeEndElement(); // end "queues"
+ prettyWriter.writeEndElement(); // end "message"
+ }
+ prettyWriter.writeEndElement(); // end "messages"
+ prettyWriter.writeEndElement(); // end "hornetq-journal"
+ prettyWriter.flush();
+ prettyWriter.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private static JournalImpl openMessageJournal(String messagesDir)
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null);
+
+ // Will use only default values. The load function should adapt to anything different
+ ConfigurationImpl defaultValues = new ConfigurationImpl();
+
+ return new JournalImpl(defaultValues.getJournalFileSize(),
+ defaultValues.getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ }
+
+ private static HashMap<Long, PersistentQueueBindingEncoding> getBindings(String bindingsDir) throws Exception
+ {
+ JournalImpl journal = openBindingsJournal(bindingsDir);
+
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ journal.start();
+
+ journal.load(records, null, null, false);
+
+ HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<Long, PersistentQueueBindingEncoding>();
+
+ for (RecordInfo info : records)
+ {
+ if (info.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
+ {
+ PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) newObjectEncoding(info);
+ queueBindings.put(bindingEncoding.getId(), bindingEncoding);
+ }
+ }
+
+ journal.stop();
+ return queueBindings;
+ }
+
+ private static JournalImpl openBindingsJournal(String bindingsDir)
+ {
+ SequentialFileFactory fileFactory = new NIOSequentialFileFactory(bindingsDir, null);
+
+ return new JournalImpl(1024 * 1024, 2, -1, 0, fileFactory, "hornetq-bindings", "bindings", 1);
+ }
+
private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
{
- TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+ TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation) tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
if (txoper == null)
{
txoper = new TXLargeMessageConfirmationOperation();
@@ -3572,43 +3805,43 @@
public List<Long> confirmedMessages = new LinkedList<Long>();
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
- */
+ * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
public void beforePrepare(Transaction tx) throws Exception
{
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
- */
+ * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+ */
public void afterPrepare(Transaction tx)
{
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
- */
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
public void beforeCommit(Transaction tx) throws Exception
{
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
- */
+ * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
public void afterCommit(Transaction tx)
{
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
- */
+ * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+ */
public void beforeRollback(Transaction tx) throws Exception
{
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
- */
+ * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+ */
public void afterRollback(Transaction tx)
{
for (Long msg : confirmedMessages)
@@ -3620,16 +3853,16 @@
catch (Throwable e)
{
log.warn("Error while confirming large message completion on rollback for recordID=" + msg +
- "->" +
- e.getMessage(),
- e);
+ "->" +
+ e.getMessage(),
+ e);
}
}
}
/* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
- */
+ * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+ */
public List<MessageReference> getRelatedMessageReferences()
{
return null;
Added: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrettyPrintHandler.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrettyPrintHandler.java (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrettyPrintHandler.java 2012-03-01 16:04:23 UTC (rev 12224)
@@ -0,0 +1,91 @@
+package org.hornetq.core.persistence.impl.journal;
+
+import javax.xml.stream.XMLStreamWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+class PrettyPrintHandler implements InvocationHandler
+{
+
+ private XMLStreamWriter target;
+
+ private int depth = 0;
+ private Map<Integer, Boolean> hasChildElement = new HashMap<Integer, Boolean>();
+
+ private static final String INDENT_CHAR = " ";
+ private static final String LINEFEED_CHAR = "\n";
+
+ public PrettyPrintHandler(XMLStreamWriter target)
+ {
+ this.target = target;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ String m = method.getName();
+
+ // Needs to be BEFORE the actual event, so that for instance the
+ // sequence writeStartElem, writeAttr, writeStartElem, writeEndElem, writeEndElem
+ // is correctly handled
+
+ if ("writeStartElement".equals(m))
+ {
+ // update state of parent node
+ if (depth > 0)
+ {
+ hasChildElement.put(depth - 1, true);
+ }
+
+ // reset state of current node
+ hasChildElement.put(depth, false);
+
+ // indent for current depth
+ target.writeCharacters(LINEFEED_CHAR);
+ target.writeCharacters(repeat(depth, INDENT_CHAR));
+
+ depth++;
+ }
+ else if ("writeEndElement".equals(m))
+ {
+ depth--;
+
+ target.writeCharacters(LINEFEED_CHAR);
+ target.writeCharacters(repeat(depth, INDENT_CHAR));
+ }
+ else if ("writeEmptyElement".equals(m))
+ {
+ // update state of parent node
+ if (depth > 0)
+ {
+ hasChildElement.put(depth - 1, true);
+ }
+
+ // indent for current depth
+ target.writeCharacters(LINEFEED_CHAR);
+ target.writeCharacters(repeat(depth, INDENT_CHAR));
+
+ }
+ else if ("writeCharacters".equals(m))
+ {
+ // indent for current depth
+ target.writeCharacters(LINEFEED_CHAR);
+ target.writeCharacters(repeat(depth, INDENT_CHAR));
+ }
+
+ method.invoke(target, args);
+
+ return null;
+ }
+
+ private String repeat(int d, String s)
+ {
+ String _s = "";
+ while (d-- > 0)
+ {
+ _s += s;
+ }
+ return _s;
+ }
+}
\ No newline at end of file
Added: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java 2012-03-01 16:04:23 UTC (rev 12224)
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A PrintData
+ *
+ * @author clebertsuconic
+ */
+public class PrintDataAsXML
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public static void main(String arg[])
+ {
+ if (arg.length != 2)
+ {
+ System.out.println("Use: java -cp hornetq-core.jar <bindings directory> <message directory>");
+ System.exit(-1);
+ }
+
+ try
+ {
+ JournalStorageManager.describeJournalAsXML(arg[0], arg[1]);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
12 years, 2 months