JBoss hornetq SVN: r9510 - trunk.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-05 10:54:07 -0400 (Thu, 05 Aug 2010)
New Revision: 9510
Modified:
trunk/pom.xml
Log:
netty upgrade
* bump up netty to 3.2.1.Final
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-08-05 14:04:35 UTC (rev 9509)
+++ trunk/pom.xml 2010-08-05 14:54:07 UTC (rev 9510)
@@ -222,7 +222,7 @@
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
- <version>3.2.0.Final</version>
+ <version>3.2.1.Final</version>
</dependency>
<!--needed to compile the logging jar-->
<dependency>
13 years, 9 months
JBoss hornetq SVN: r9509 - trunk/src/main/org/hornetq/core/remoting/impl/netty.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-05 10:04:35 -0400 (Thu, 05 Aug 2010)
New Revision: 9509
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
Log:
Netty acceptor pipelines
* use named handlers for STOMP_WS protocol and a static channel pipeline for the other protocols
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-08-05 12:50:31 UTC (rev 9508)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-08-05 14:04:35 UTC (rev 9509)
@@ -17,8 +17,9 @@
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -329,15 +330,9 @@
ChannelPipelineFactory factory = new ChannelPipelineFactory()
{
- /**
- * we use named handlers so that the web socket server handler can
- * replace the http encode/decoder after the http handshake.
- *
- * @see WebSocketServerHandler#handleHttpRequest(ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
- */
public ChannelPipeline getPipeline() throws Exception
{
- List<ChannelHandler> handlers = new ArrayList<ChannelHandler>();
+ Map<String, ChannelHandler> handlers = new LinkedHashMap<String, ChannelHandler>();
if (sslEnabled)
{
@@ -347,40 +342,57 @@
SslHandler handler = new SslHandler(engine);
- handlers.add(handler);
+ handlers.put("ssl", handler);
}
if (httpEnabled)
{
- handlers.add(new HttpRequestDecoder());
+ handlers.put("http-decoder", new HttpRequestDecoder());
- handlers.add(new HttpResponseEncoder());
+ handlers.put("http-encoder", new HttpResponseEncoder());
- handlers.add(new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
+ handlers.put("http-handler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
if (protocol == ProtocolType.CORE)
{
- // Core protocol uses it's own optimised decoder
+ // Core protocol uses its own optimised decoder
- handlers.add(new HornetQFrameDecoder2());
+ handlers.put("hornetq-decode", new HornetQFrameDecoder2());
}
else if (protocol == ProtocolType.STOMP_WS)
{
- handlers.add(new HttpRequestDecoder());
- handlers.add(new HttpChunkAggregator(65536));
- handlers.add(new HttpResponseEncoder());
- handlers.add(new HornetQFrameDecoder(decoder));
- handlers.add(new WebSocketServerHandler());
+ handlers.put("http-decoder", new HttpRequestDecoder());
+ handlers.put("http-aggregator", new HttpChunkAggregator(65536));
+ handlers.put("http-encoder", new HttpResponseEncoder());
+ handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
+ handlers.put("websocket-handler", new WebSocketServerHandler());
}
else
{
- handlers.add(new HornetQFrameDecoder(decoder));
+ handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
}
- handlers.add(new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
+ handlers.put("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
- ChannelPipeline pipeline = new StaticChannelPipeline(handlers.toArray(new ChannelHandler[handlers.size()]));
+ /**
+ * STOMP_WS protocol mandates use of named handlers to be able to replace http codecs
+ * by websocket codecs after handshake.
+ * Other protocols can use a faster static channel pipeline directly.
+ */
+ ChannelPipeline pipeline;
+ if (protocol == ProtocolType.STOMP_WS)
+ {
+ pipeline = new DefaultChannelPipeline();
+ for (Entry<String, ChannelHandler> handler : handlers.entrySet())
+ {
+ pipeline.addLast(handler.getKey(), handler.getValue());
+ }
+ }
+ else
+ {
+ pipeline = new StaticChannelPipeline(handlers.values().toArray(new ChannelHandler[handlers.size()]));
+ }
return pipeline;
}
13 years, 9 months
JBoss hornetq SVN: r9508 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-05 08:50:31 -0400 (Thu, 05 Aug 2010)
New Revision: 9508
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
Log:
STOMP
* fix the copy of a Core message's body to a Stomp frame (broken after r9484)
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-08-04 20:26:46 UTC (rev 9507)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-08-05 12:50:31 UTC (rev 9508)
@@ -91,10 +91,11 @@
headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
HornetQBuffer buffer = serverMessage.getBodyBuffer();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- : serverMessage.getEndOfBodyPosition();
+ : serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
byte[] data = new byte[size];
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
{
13 years, 9 months
JBoss hornetq SVN: r9507 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-04 16:26:46 -0400 (Wed, 04 Aug 2010)
New Revision: 9507
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java
Log:
small tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java 2010-08-04 18:40:08 UTC (rev 9506)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java 2010-08-04 20:26:46 UTC (rev 9507)
@@ -58,7 +58,7 @@
protected void tearDown() throws Exception
{
-
+ super.tearDown();
}
public void testExportImport() throws Exception
@@ -98,9 +98,9 @@
addTx(11, 11, 12);
updateTx(11, 11, 12);
commit(11);
-
+
journal.forceMoveNextFile();
-
+
update(11, 12);
stopJournal();
@@ -115,7 +115,6 @@
}
-
public void testExportImport3() throws Exception
{
setup(10, 10 * 1024, true);
@@ -164,7 +163,7 @@
startJournal();
loadAndCheck();
-
+
commit(11);
stopJournal();
@@ -176,8 +175,7 @@
startJournal();
loadAndCheck();
-
-
+
}
public void testExportImport2() throws Exception
13 years, 9 months
JBoss hornetq SVN: r9506 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-04 14:40:08 -0400 (Wed, 04 Aug 2010)
New Revision: 9506
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java
Log:
tweaks on test
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java 2010-08-04 18:12:40 UTC (rev 9505)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java 2010-08-04 18:40:08 UTC (rev 9506)
@@ -96,7 +96,12 @@
commit(10);
addTx(11, 11, 12);
+ updateTx(11, 11, 12);
commit(11);
+
+ journal.forceMoveNextFile();
+
+ update(11, 12);
stopJournal();
13 years, 9 months
JBoss hornetq SVN: r9505 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-04 14:12:40 -0400 (Wed, 04 Aug 2010)
New Revision: 9505
Modified:
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
Log:
HORNETQ-180 - Import / export tool shouldn't compact on import
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-04 17:19:50 UTC (rev 9504)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-04 18:12:40 UTC (rev 9505)
@@ -238,8 +238,6 @@
System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage());
}
}
-
- journal.compact();
journal.stop();
}
13 years, 9 months
JBoss hornetq SVN: r9504 - in trunk: tests/src/org/hornetq/tests/integration/journal and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-04 13:19:50 -0400 (Wed, 04 Aug 2010)
New Revision: 9504
Added:
trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java
Removed:
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
HORNETQ-180 - Import / export tool
Added: trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-04 17:19:50 UTC (rev 9504)
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.hornetq.core.journal.*;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.utils.Base64;
+
+/**
+ * Use this class to export the journal data. You can use it as a main class or through its native method {@link ExportJournal#exportJournal(String, String, String, int, int, String)}
+ *
+ * If you use the main method, use it as <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>
+ *
+ * Example: java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal /journalDir hornetq-data hq 2 10485760 /tmp/export.dat
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ExportJournal
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public static void main(String arg[])
+ {
+ if (arg.length != 6)
+ {
+ System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>");
+ return;
+ }
+
+ try
+ {
+ exportJournal(arg[0], arg[1], arg[2], Integer.parseInt(arg[3]), Integer.parseInt(arg[4]), arg[5]);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void exportJournal(String directory,
+ String journalPrefix,
+ String journalSuffix,
+ int minFiles,
+ int fileSize,
+ String fileOutpu) throws Exception
+ {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ FileOutputStream fileOut = new FileOutputStream(new File(fileOutpu));
+
+ BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
+
+ PrintStream out = new PrintStream(buffOut);
+
+ List<JournalFile> files = journal.orderFiles();
+
+ for (JournalFile file : files)
+ {
+ out.println("#File," + file);
+
+ exportJournalFile(out, nio, file);
+ }
+
+ out.close();
+ }
+
+ /**
+ * @param out
+ * @param fileFactory
+ * @param file
+ * @throws Exception
+ */
+ public static void exportJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+ {
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
+
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ }
+
+ public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@Update," + describeRecord(recordInfo));
+ }
+
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ out.println("operation@Rollback,txID@" + transactionID);
+ }
+
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ out.println("operation@Prepare,txID@" + transactionID +
+ ",numberOfRecords@" +
+ numberOfRecords +
+ ",extraData@" +
+ encode(extraData));
+ }
+
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@DeleteRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ }
+
+ public void onReadDeleteRecord(long recordID) throws Exception
+ {
+ out.println("operation@DeleteRecord,id@" + recordID);
+ }
+
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
+ }
+
+ public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ }
+
+ public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@AddRecord," + describeRecord(recordInfo));
+ }
+
+ public void markAsDataFile(JournalFile file)
+ {
+ }
+ });
+ }
+
+ private static String describeRecord(RecordInfo recordInfo)
+ {
+ return "id@" + recordInfo.id +
+ ",userRecordType@" +
+ recordInfo.userRecordType +
+ ",length@" +
+ recordInfo.data.length +
+ ",isUpdate@" +
+ recordInfo.isUpdate +
+ ",data@" +
+ encode(recordInfo.data);
+ }
+
+ private static String encode(byte[] data)
+ {
+ return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-04 17:19:50 UTC (rev 9504)
@@ -0,0 +1,358 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.utils.Base64;
+
+/**
+ * Use this class to import the journal data from a listed file. You can use it as a main class or through its native method {@link ImportJournal#importJournal(String, String, String, int, int, String)}
+ *
+ * If you use the main method, use it as <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>
+ *
+ * Example: java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal /journalDir hornetq-data hq 2 10485760 /tmp/export.dat
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ImportJournal
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public static void main(String arg[])
+ {
+ if (arg.length != 6)
+ {
+ System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.ImportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>");
+ return;
+ }
+
+ try
+ {
+ importJournal(arg[0], arg[1], arg[2], Integer.parseInt(arg[3]), Integer.parseInt(arg[4]), arg[5]);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void importJournal(String directory,
+ String journalPrefix,
+ String journalSuffix,
+ int minFiles,
+ int fileSize,
+ String fileInput) throws Exception
+ {
+
+ File journalDir = new File(directory);
+
+ journalDir.mkdirs();
+
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ if (journal.orderFiles().size() != 0)
+ {
+ throw new IllegalStateException("Import needs to create a brand new journal");
+ }
+
+ journal.start();
+
+ // The journal is empty, as we checked already. Calling load just to initialize the internal data
+ journal.loadInternalOnly();
+
+ FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fileInputStream));
+
+ String line;
+
+ HashMap<Long, AtomicInteger> txCounters = new HashMap<Long, AtomicInteger>();
+
+ long lineNumber = 0;
+
+ Map<Long, JournalRecord> journalRecords = journal.getRecords();
+
+ while ((line = reader.readLine()) != null)
+ {
+ lineNumber++;
+ String splitLine[] = line.split(",");
+ if (splitLine[0].equals("#File"))
+ {
+ txCounters.clear();
+ continue;
+ }
+
+ Properties lineProperties = parseLine(splitLine);
+
+ String operation = null;
+ try
+ {
+ operation = lineProperties.getProperty("operation");
+
+ if (operation.equals("AddRecord"))
+ {
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
+ }
+ else if (operation.equals("AddRecordTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ }
+ else if (operation.equals("AddRecordTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ }
+ else if (operation.equals("UpdateTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
+ }
+ else if (operation.equals("Update"))
+ {
+ RecordInfo info = parseRecord(lineProperties);
+ journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
+ }
+ else if (operation.equals("DeleteRecord"))
+ {
+ long id = parseLong("id", lineProperties);
+
+ // If not found it means the append/update records were reclaimed already
+ if (journalRecords.get((Long)id) != null)
+ {
+ journal.appendDeleteRecord(id, false);
+ }
+ }
+ else if (operation.equals("DeleteRecordTX"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ long id = parseLong("id", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ counter.incrementAndGet();
+
+ // If not found it means the append/update records were reclaimed already
+ if (journalRecords.get((Long)id) != null)
+ {
+ journal.appendDeleteRecordTransactional(txID, id);
+ }
+ }
+ else if (operation.equals("Prepare"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ byte[] data = parseEncoding("extraData", lineProperties);
+
+ if (counter.get() == numberOfRecords)
+ {
+ journal.appendPrepareRecord(txID, data, false);
+ }
+ else
+ {
+ System.err.println("Transaction " + txID +
+ " at line " +
+ lineNumber +
+ " is incomplete. The prepare record expected " +
+ numberOfRecords +
+ " while the import only had " +
+ counter);
+ }
+ }
+ else if (operation.equals("Commit"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = getCounter(txID, txCounters);
+ if (counter.get() == numberOfRecords)
+ {
+ journal.appendCommitRecord(txID, false);
+ }
+ else
+ {
+ System.err.println("Transaction " + txID +
+ " at line " +
+ lineNumber +
+ " is incomplete. The commit record expected " +
+ numberOfRecords +
+ " while the import only had " +
+ counter);
+ }
+ }
+ else if (operation.equals("Rollback"))
+ {
+ long txID = parseLong("txID", lineProperties);
+ journal.appendRollbackRecord(txID, false);
+ }
+ else
+ {
+ System.err.println("Invalid opeartion " + operation + " at line " + lineNumber);
+ }
+ }
+ catch (Exception ex)
+ {
+ System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage());
+ }
+ }
+
+ journal.compact();
+
+ journal.stop();
+ }
+
+ protected static AtomicInteger getCounter(Long txID, Map<Long, AtomicInteger> txCounters)
+ {
+
+ AtomicInteger counter = txCounters.get(txID);
+ if (counter == null)
+ {
+ counter = new AtomicInteger(0);
+ txCounters.put(txID, counter);
+ }
+
+ return counter;
+ }
+
+ protected static RecordInfo parseRecord(Properties properties) throws Exception
+ {
+ int id = parseInt("id", properties);
+ byte userRecordType = parseByte("userRecordType", properties);
+ boolean isUpdate = parseBoolean("isUpdate", properties);
+ byte[] data = parseEncoding("data", properties);
+ return new RecordInfo(id, userRecordType, data, isUpdate);
+ }
+
+ private static byte[] parseEncoding(String name, Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return decode(value);
+ }
+
+ /**
+ * @param properties
+ * @return
+ */
+ private static int parseInt(String name, Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Integer.parseInt(value);
+ }
+
+ private static long parseLong(String name, Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Long.parseLong(value);
+ }
+
+ private static boolean parseBoolean(String name, Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Boolean.parseBoolean(value);
+ }
+
+ private static byte parseByte(String name, Properties properties) throws Exception
+ {
+ String value = parseString(name, properties);
+
+ return Byte.parseByte(value);
+ }
+
+ /**
+ * @param name
+ * @param properties
+ * @return
+ * @throws Exception
+ */
+ private static String parseString(String name, Properties properties) throws Exception
+ {
+ String value = properties.getProperty(name);
+
+ if (value == null)
+ {
+ throw new Exception("property " + name + " not found");
+ }
+ return value;
+ }
+
+ protected static Properties parseLine(String[] splitLine)
+ {
+ Properties properties = new Properties();
+
+ for (String el : splitLine)
+ {
+ String[] tuple = el.split("@");
+ if (tuple.length == 2)
+ {
+ properties.put(tuple[0], tuple[1]);
+ }
+ else
+ {
+ properties.put(tuple[0], tuple[0]);
+ }
+ }
+
+ return properties;
+ }
+
+ private static byte[] decode(String data)
+ {
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-04 07:16:42 UTC (rev 9503)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-04 17:19:50 UTC (rev 9504)
@@ -13,7 +13,6 @@
package org.hornetq.core.journal.impl;
-import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -447,91 +446,10 @@
}
- /**
- * @param fileFactory
- * @param journal
- * @throws Exception
- */
- public static void listJournalFiles(final PrintStream out, final JournalImpl journal) throws Exception
- {
- List<JournalFile> files = journal.orderFiles();
-
- SequentialFileFactory fileFactory = journal.fileFactory;
- for (JournalFile file : files)
- {
- out.println("####### listing file " + file.getFile().getFileName() +
- " sequence = " +
- file.getFileID());
- listJournalFile(out, fileFactory, file);
- }
- }
- /**
- * @param out
- * @param fileFactory
- * @param file
- * @throws Exception
- */
- public static void listJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
- {
- JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
- {
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
- }
-
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
- {
- out.println("ReadUpdate " + recordInfo);
- }
-
- public void onReadRollbackRecord(long transactionID) throws Exception
- {
- out.println("Rollback txID=" + transactionID);
- }
-
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
- {
- out.println("Prepare txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
- }
-
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
- }
-
- public void onReadDeleteRecord(long recordID) throws Exception
- {
- out.println("DeleteRecord id=" + recordID);
- }
-
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
- {
- out.println("CommitRecord txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
- }
-
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
- }
-
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
- {
- out.println("AddRecord " + recordInfo);
- }
-
- public void markAsDataFile(JournalFile file)
- {
- }
- });
- }
-
-
-
/** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
@@ -3319,7 +3237,7 @@
while (nextFile == null)
{
- nextFile = openedFiles.poll(60, TimeUnit.SECONDS);
+ nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
if (nextFile == null)
{
JournalImpl.log.warn("Couldn't open a file in 60 Seconds",
Added: trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOImportExportTest.java 2010-08-04 17:19:50 UTC (rev 9504)
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
+
+/**
+ * A NIOImportExportTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NIOImportExportTest extends JournalImplTestBase
+{
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(getTestDir(), true);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected void tearDown() throws Exception
+ {
+
+ }
+
+ public void testExportImport() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ add(1, 2);
+
+ journal.forceMoveNextFile();
+
+ delete(1, 2);
+
+ add(3, 4);
+
+ journal.forceMoveNextFile();
+
+ addTx(5, 6, 7, 8);
+
+ journal.forceMoveNextFile();
+
+ addTx(5, 9);
+
+ commit(5);
+
+ journal.forceMoveNextFile();
+
+ deleteTx(10, 6, 7, 8, 9);
+
+ commit(10);
+
+ addTx(11, 11, 12);
+ commit(11);
+
+ stopJournal();
+
+ exportImportJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ }
+
+
+ public void testExportImport3() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ add(1, 2);
+
+ journal.forceMoveNextFile();
+
+ delete(1, 2);
+
+ add(3, 4);
+
+ journal.forceMoveNextFile();
+
+ addTx(5, 6, 7, 8);
+
+ journal.forceMoveNextFile();
+
+ addTx(5, 9);
+
+ commit(5);
+
+ journal.forceMoveNextFile();
+
+ deleteTx(10, 6, 7, 8, 9);
+
+ commit(10);
+
+ addTx(11, 12, 13);
+
+ EncodingSupport xid = new SimpleEncoding(10, (byte)0);
+ prepare(11, xid);
+
+ stopJournal();
+
+ exportImportJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ commit(11);
+
+ stopJournal();
+
+ exportImportJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+
+ }
+
+ public void testExportImport2() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ add(1);
+
+ stopJournal();
+
+ exportImportJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-04 07:16:42 UTC (rev 9503)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-04 17:19:50 UTC (rev 9504)
@@ -13,6 +13,8 @@
package org.hornetq.tests.unit.core.journal.impl;
+import java.io.File;
+import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -28,6 +30,8 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TestableJournal;
+import org.hornetq.core.journal.impl.ExportJournal;
+import org.hornetq.core.journal.impl.ImportJournal;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -165,6 +169,50 @@
journal.stop();
}
+ /**
+ * @throws Exception
+ */
+ protected void exportImportJournal() throws Exception
+ {
+ System.out.println("Exporting to " + getTestDir() + "/output.log");
+
+ ExportJournal.exportJournal(getTestDir(),
+ this.filePrefix,
+ this.fileExtension,
+ this.minFiles,
+ this.fileSize,
+ getTestDir() + "/output.log");
+
+ File dir = new File(getTestDir());
+
+ FilenameFilter fnf = new FilenameFilter()
+ {
+ public boolean accept(final File file, final String name)
+ {
+ return name.endsWith("." + fileExtension);
+ }
+ };
+
+ System.out.println("file = " + dir);
+
+ File files[] = dir.listFiles(fnf);
+
+ for (File file : files)
+ {
+ System.out.println("Deleting " + file);
+ file.delete();
+ }
+
+ ImportJournal.importJournal(getTestDir(),
+ filePrefix,
+ fileExtension,
+ minFiles,
+ fileSize,
+ getTestDir() + "/output.log");
+ }
+
+
+
protected void loadAndCheck() throws Exception
{
loadAndCheck(false);
Deleted: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-08-04 07:16:42 UTC (rev 9503)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-08-04 17:19:50 UTC (rev 9504)
@@ -1,128 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.util;
-
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.util.ArrayList;
-
-import org.hornetq.core.config.impl.FileConfiguration;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl;
-
-/**
- * Lists the journal content for debug purposes.
- *
- * This is just a class useful on debug during development,
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * Created Dec 12, 2008 11:42:35 AM
- *
- *
- */
-public class ListJournal
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public static void main(final String arg[])
- {
- try
- {
- FileConfiguration fileConf = new FileConfiguration();
-
- fileConf.setJournalDirectory("/work/projects/trunk/journal");
-
- // fileConf.setConfigurationUrl(arg[0]);
-
- fileConf.start();
-
- SequentialFileFactory fileFactory = new AIOSequentialFileFactory(fileConf.getJournalDirectory());
-
- JournalImpl journal = new JournalImpl(fileConf.getJournalFileSize(),
- 10,
- 0,
- 0,
- fileFactory,
- "hornetq-data",
- "hq",
- fileConf.getJournalMaxIO_NIO());
-
- ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
- ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
-
- journal.start();
-
-
- PrintStream out = new PrintStream(new FileOutputStream("/tmp/file.out"));
-
- out.println("######### Journal records per file");
-
- JournalImpl.listJournalFiles(out, journal);
-
- journal.load(records, prepared, null);
-
- out.println();
-
- out.println("##########################################");
- out.println("# T O T A L L I S T #");
-
- if (prepared.size() > 0)
- {
- out.println("There are " + prepared.size() + " prepared transactions on the journal");
- }
-
- out.println("Total of " + records.size() + " committed records");
-
- for (RecordInfo record : records)
- {
- out.println("user record: " + record);
- }
-
- journal.checkReclaimStatus();
-
- System.out.println("Data = " + journal.debug());
-
- journal.stop();
-
- out.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
13 years, 9 months
JBoss hornetq SVN: r9503 - trunk/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-08-04 03:16:42 -0400 (Wed, 04 Aug 2010)
New Revision: 9503
Modified:
trunk/docs/user-manual/zh/client-classpath.xml
trunk/docs/user-manual/zh/management.xml
trunk/docs/user-manual/zh/perf-tuning.xml
Log:
doc sync
Modified: trunk/docs/user-manual/zh/client-classpath.xml
===================================================================
--- trunk/docs/user-manual/zh/client-classpath.xml 2010-08-04 05:26:04 UTC (rev 9502)
+++ trunk/docs/user-manual/zh/client-classpath.xml 2010-08-04 07:16:42 UTC (rev 9503)
@@ -29,11 +29,15 @@
<para>如果客户端只使用HornetQ内核(非JMS客户端),需要将 <literal
>hornetq-core-client.jar</literal>和
<literal>netty.jar</literal> 放到classpath中。</para>
- </section>
+ <para>如果客户端运行于<emphasis>Java 5 虚拟机</emphasis>上,
+ 请使用<literal>hornetq-core-client-java5.jar</literal>。</para>
+</section>
<section>
<title>JMS客户端</title>
<para>如果客户端使用JMS,需要在classpath上增加两个jar文件: <literal
>hornetq-jms-client.jar</literal> 和 <literal>jboss-jms-api.jar</literal>。</para>
+ <para>如果客户端运行于<emphasis>Java 5 虚拟机</emphasis>上,请使用
+ <literal>hornetq-jms-client-java5.jar</literal>。</para>
<note>
<para><literal>jboss-jms-api.jar</literal>中包含的只是 <literal>javax.jms.*</literal> 包中的接口类。
如果这些类已经在你的classpath中,则你就不需要这个jar文件。</para>
Modified: trunk/docs/user-manual/zh/management.xml
===================================================================
--- trunk/docs/user-manual/zh/management.xml 2010-08-04 05:26:04 UTC (rev 9502)
+++ trunk/docs/user-manual/zh/management.xml 2010-08-04 07:16:42 UTC (rev 9503)
@@ -113,6 +113,17 @@
<para><literal>HornetQServerControl</literal>提供了访问HornetQ服务器所有属性
的方法(例如<literal>getVersion()</literal>方法可以得到服务器的版本,等等)。 </para>
</listitem>
+ <listitem>
+ <para>核心桥和转发器的创建,删除与列表</para>
+ <para>使用<literal>getBridgeNames()</literal>可以列出部署的核心桥。
+ 使用<literal>getDivertNames()</literal>可以列出部署的转发器。</para>
+ <para>使用<literal>HornetQServerControl</literal> (ObjectName <literal
+ >org.hornetq:module=Core,type=Server</literal> 或资源名 <literal
+ >core.server</literal>)的方法<literal>createBridge()</literal>
+ 和<literal>destroyBridge()</literal>可以创建和删除核心桥。
+ 通过<literal>createDivert()</literal>和<literal>destroyDivert()</literal>
+ 可以创建和删除转发器。</para>
+ </listitem>
</itemizedlist>
</section>
<section>
Modified: trunk/docs/user-manual/zh/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/zh/perf-tuning.xml 2010-08-04 05:26:04 UTC (rev 9502)
+++ trunk/docs/user-manual/zh/perf-tuning.xml 2010-08-04 07:16:42 UTC (rev 9503)
@@ -149,8 +149,12 @@
<itemizedlist>
<listitem>
<para>TCP缓存大小。如果你的网络速度很快,并且你的主机也很快,你可以通过增加TCP的发送和接收缓存
- 来提高性能。参见<xref linkend="configuring-transports"/>中的详细说明。
- </para>
+ 来提高性能。参见<xref linkend="configuring-transports"/>中的详细说明。</para>
+ <note>
+ <para>注意某些操作系统,如最近的Linux版本中,包括了TCP自动优化功能。如果再手工设置TCP缓存
+ 会导致自动优化失效,最終使性能下降!
+ </para>
+ </note>
</listitem>
<listitem>
<para>增加服务器中文件句柄数量限制。如果你的服务器将要处理很多并行的连接,或者客户端在快速不停地
13 years, 9 months
JBoss hornetq SVN: r9502 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-04 01:26:04 -0400 (Wed, 04 Aug 2010)
New Revision: 9502
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
tweaks on delete messages after restart
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 19:21:41 UTC (rev 9501)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-04 05:26:04 UTC (rev 9502)
@@ -1016,7 +1016,16 @@
if (!referencedMessages.contains(msg.getMessageID()))
{
log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
- deleteMessage(msg.getMessageID());
+ // Something after routing could delete messages
+ // So we ignore eventual ignores
+ try
+ {
+ deleteMessage(msg.getMessageID());
+ }
+ catch (Exception ignored)
+ {
+ log.warn("It wasn't possible to delete message " + msg.getMessageID());
+ }
}
}
13 years, 9 months
JBoss hornetq SVN: r9501 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-03 15:21:41 -0400 (Tue, 03 Aug 2010)
New Revision: 9501
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
small tweak
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 18:39:46 UTC (rev 9500)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-03 19:21:41 UTC (rev 9501)
@@ -1570,6 +1570,9 @@
*/
public void executeOnCompletion(final IOAsyncTask runnable)
{
+ // There are no executeOnCompletion calls while using the DummyOperationContext
+ // However we keep the code here for correctness
+ runnable.done();
}
/* (non-Javadoc)
13 years, 9 months