JBoss hornetq SVN: r9191 - in trunk: tests/src/org/hornetq/tests/unit/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-04-30 14:39:43 -0400 (Fri, 30 Apr 2010)
New Revision: 9191
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-318 - Longs on FileIDs
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-04-30 14:56:42 UTC (rev 9190)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-04-30 18:39:43 UTC (rev 9191)
@@ -43,6 +43,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
@@ -2763,26 +2764,31 @@
SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
file.open(1, false);
-
- long fileID = readFileHeader(file);
-
- if (nextFileID.get() < fileID)
+
+ try
{
- nextFileID.set(fileID);
+ long fileID = readFileHeader(file);
+
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(fileName);
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+
+ orderedFiles.add(new JournalFileImpl(file, fileID));
}
-
- int fileNameID = getFileNameID(fileName);
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
+ finally
{
- nextFileID.set(fileNameID);
+ file.close();
}
-
- orderedFiles.add(new JournalFileImpl(file, fileID));
-
- file.close();
}
// Now order them by ordering id - we can't use the file name for ordering
@@ -2806,8 +2812,19 @@
int journalVersion = bb.getInt();
- int userVersion = bb.getInt();
+ if (journalVersion != FORMAT_VERSION)
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
+ }
+
+ int readUserVersion = bb.getInt();
+
+ if (readUserVersion != userVersion)
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "Journal data belong to a different version");
+ }
+
long fileID = bb.getLong();
fileFactory.releaseBuffer(bb);
@@ -2821,7 +2838,7 @@
* @param sequentialFile
* @throws Exception
*/
- static int initFileHeader(final SequentialFileFactory fileFactory, final SequentialFile sequentialFile, final int userVersion, final long fileID) throws Exception
+ public static int initFileHeader(final SequentialFileFactory fileFactory, final SequentialFile sequentialFile, final int userVersion, final long fileID) throws Exception
{
// We don't need to release buffers while writing.
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
@@ -2846,7 +2863,7 @@
* @param userVersion
* @param fileID
*/
- static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
+ public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
{
buffer.writeInt(FORMAT_VERSION);
@@ -2953,11 +2970,11 @@
}
/** Get the ID part of the name */
- private int getFileNameID(final String fileName)
+ private long getFileNameID(final String fileName)
{
try
{
- return Integer.parseInt(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+ return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
}
catch (Throwable e)
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-04-30 14:56:42 UTC (rev 9190)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-04-30 18:39:43 UTC (rev 9191)
@@ -13,12 +13,15 @@
package org.hornetq.tests.unit.core.journal.impl;
+import java.nio.ByteBuffer;
import java.util.List;
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.unit.core.journal.impl.fakes.SimpleEncoding;
@@ -186,7 +189,114 @@
}
}
+
+
+ public void testVersionCheck() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+ stopJournal();
+
+ fileFactory.start();
+ List<String> files = fileFactory.listFiles(fileExtension);
+
+ for (String fileStr : files)
+ {
+
+ SequentialFile file = fileFactory.createSequentialFile(fileStr, 1);
+
+ ByteBuffer buffer = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+
+ for (int i = 0 ; i < JournalImpl.SIZE_HEADER; i++)
+ {
+ buffer.put(Byte.MAX_VALUE);
+ }
+
+ buffer.rewind();
+
+ file.open();
+
+ file.position(0);
+
+ file.writeDirect(buffer, sync);
+
+ file.close();
+ }
+
+ fileFactory.stop();
+
+ startJournal();
+
+ boolean exceptionHappened = false;
+ try
+ {
+ load();
+ }
+ catch (HornetQException e)
+ {
+ exceptionHappened = true;
+ assertEquals(HornetQException.IO_ERROR, e.getCode());
+ }
+
+ assertTrue("Exception was expected", exceptionHappened);
+ stopJournal();
+
+
+ }
+
+ // Validates the if the journal will work when the IDs are over MaxInt
+ public void testMaxInt() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+ stopJournal();
+
+ fileFactory.start();
+
+ List<String> files = fileFactory.listFiles(fileExtension);
+
+ long fileID = Integer.MAX_VALUE;
+ for (String fileStr : files)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(fileStr, 1);
+
+ file.open();
+
+ JournalImpl.initFileHeader(fileFactory, file, journal.getUserVersion(), fileID++);
+
+ file.close();
+ }
+
+ fileFactory.stop();
+
+ startJournal();
+
+ load();
+
+ for (long i = 0 ; i < 100; i++)
+ {
+ add(i);
+
+ stopJournal();
+
+ startJournal();
+
+ loadAndCheck();
+ }
+
+ stopJournal();
+
+ }
+
+
+
public void testFilesImmediatelyAfterload() throws Exception
{
try
14 years, 10 months
JBoss hornetq SVN: r9190 - trunk/src/main/org/hornetq/ra.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-04-30 10:56:42 -0400 (Fri, 30 Apr 2010)
New Revision: 9190
Modified:
trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-349: Clarify XA/transaction behavior of resource adapter
* use a non-transacted JMS session for work performed outside a TX branch (JCA 1.6 ?\194?\1677.15.1.1)
Modified: trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-04-30 12:41:09 UTC (rev 9189)
+++ trunk/src/main/org/hornetq/ra/HornetQRAManagedConnection.java 2010-04-30 14:56:42 UTC (rev 9190)
@@ -90,8 +90,11 @@
// Physical JMS connection stuff
private Connection connection;
+ // auto-commit session, used outside XA or Local transaction
private Session session;
+ private Session transactedSession;
+
private XASession xaSession;
private XAResource xaResource;
@@ -124,6 +127,7 @@
connection = null;
session = null;
+ transactedSession = null;
xaSession = null;
xaResource = null;
@@ -251,6 +255,11 @@
session.close();
}
+ if (transactedSession != null)
+ {
+ transactedSession.close();
+ }
+
if (xaSession != null)
{
xaSession.close();
@@ -268,7 +277,7 @@
}
catch (Throwable e)
{
- throw new ResourceException("Could not properly close the session and connection", e);
+ throw new ResourceException("Could not properly close the transactedSession and connection", e);
}
}
@@ -562,12 +571,24 @@
}
else
{
- if (HornetQRAManagedConnection.trace)
+ if (inManagedTx)
{
- HornetQRAManagedConnection.log.trace("getSession() -> session " + xaSession.getSession());
+ if (HornetQRAManagedConnection.trace)
+ {
+ HornetQRAManagedConnection.log.trace("getSession() -> transactedSession " + transactedSession);
+ }
+
+ return transactedSession;
}
+ else
+ {
+ if (HornetQRAManagedConnection.trace)
+ {
+ HornetQRAManagedConnection.log.trace("getSession() -> session " + session);
+ }
- return session;
+ return session;
+ }
}
}
@@ -737,7 +758,8 @@
connection.setExceptionListener(this);
xaSession = ((XATopicConnection)connection).createXATopicSession();
- session = ((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
+ transactedSession = ((TopicConnection)connection).createTopicSession(transacted, acknowledgeMode);
+ session = ((TopicConnection)connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
}
else if (cri.getType() == HornetQRAConnectionFactory.QUEUE_CONNECTION)
{
@@ -753,7 +775,8 @@
connection.setExceptionListener(this);
xaSession = ((XAQueueConnection)connection).createXAQueueSession();
- session = ((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
+ transactedSession = ((QueueConnection)connection).createQueueSession(transacted, acknowledgeMode);
+ session = ((QueueConnection)connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
}
else
{
@@ -769,7 +792,8 @@
connection.setExceptionListener(this);
xaSession = ((XAConnection)connection).createXASession();
- session = connection.createSession(transacted, acknowledgeMode);
+ transactedSession = connection.createSession(transacted, acknowledgeMode);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
}
catch (JMSException je)
14 years, 10 months
JBoss hornetq SVN: r9189 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-30 08:41:09 -0400 (Fri, 30 Apr 2010)
New Revision: 9189
Modified:
trunk/docs/user-manual/en/persistence.xml
Log:
https://jira.jboss.org/jira/browse/HORNETQ-293
Modified: trunk/docs/user-manual/en/persistence.xml
===================================================================
--- trunk/docs/user-manual/en/persistence.xml 2010-04-30 12:21:36 UTC (rev 9188)
+++ trunk/docs/user-manual/en/persistence.xml 2010-04-30 12:41:09 UTC (rev 9189)
@@ -67,6 +67,8 @@
<para>The AIO journal is only available when running Linux kernel 2.6 or later and after
having installed libaio (if it's not already installed). For instructions on how to
install libaio please see <xref linkend="installing-aio"/>.</para>
+ <para>Also, please note that AIO will only work with the following file systems: ext2, ext3, ext4, jfs, xfs. With other file systems,
+ e.g. NFS it may appear to work, but it will fall back to a slower sychronous behaviour. Don't put the journal on a NFS share!</para>
<para>For more information on libaio please see <xref linkend="libaio"/>.</para>
<para>libaio is part of the kernel project.</para>
</listitem>
14 years, 10 months
JBoss hornetq SVN: r9188 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-30 08:21:36 -0400 (Fri, 30 Apr 2010)
New Revision: 9188
Modified:
trunk/docs/user-manual/en/paging.xml
Log:
https://jira.jboss.org/jira/browse/HORNETQ-368
Modified: trunk/docs/user-manual/en/paging.xml
===================================================================
--- trunk/docs/user-manual/en/paging.xml 2010-04-29 23:17:36 UTC (rev 9187)
+++ trunk/docs/user-manual/en/paging.xml 2010-04-30 12:21:36 UTC (rev 9188)
@@ -125,7 +125,7 @@
<entry>10MiB (10 * 1024 * 1024 bytes)</entry>
</row>
<row>
- <entry><literal>address-full-message-policy</literal></entry>
+ <entry><literal>address-full-policy</literal></entry>
<entry>This must be set to PAGE for paging to enable. If the value
is PAGE then further messages will be paged to disk. If the
value is DROP then further messages will be silently dropped. If
14 years, 10 months
JBoss hornetq SVN: r9187 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-04-29 19:17:36 -0400 (Thu, 29 Apr 2010)
New Revision: 9187
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/JournalPerfTuneTest.java
Log:
just a tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/JournalPerfTuneTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/JournalPerfTuneTest.java 2010-04-29 23:01:30 UTC (rev 9186)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/JournalPerfTuneTest.java 2010-04-29 23:17:36 UTC (rev 9187)
@@ -64,7 +64,7 @@
final String extension = "hq";
final int maxIO = 500;
- final String journalDir = "/jbm-data/journal-test";
+ final String journalDir = getTestDir();
final int bufferSize = 490 * 1024;
final int bufferTimeout = (int)(1000000000d / 2000);
final boolean logRates = true;
14 years, 10 months
JBoss hornetq SVN: r9186 - in trunk: src/main/org/hornetq/core/journal/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-04-29 19:01:30 -0400 (Thu, 29 Apr 2010)
New Revision: 9186
Modified:
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-318 - using a long to the fileID - first commit
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -114,6 +114,8 @@
int getAlignment() throws Exception;
int getNumberOfRecords();
+
+ int getUserVersion();
void perfBlast(int pages) throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -55,10 +55,8 @@
protected SequentialFile sequentialFile;
- protected int fileID;
+ protected long nextOrderingID;
- protected int nextOrderingID;
-
private HornetQBuffer writingChannel;
private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();
@@ -72,7 +70,7 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final Set<Long> recordsSnapshot,
- final int nextOrderingID)
+ final long nextOrderingID)
{
super();
this.journal = journal;
@@ -95,11 +93,8 @@
{
controlFile.open(1, false);
- HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(1);
+ JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
- renameBuffer.writeInt(-1);
- renameBuffer.writeInt(-1);
-
HornetQBuffer filesToRename = HornetQBuffers.dynamicBuffer(1);
// DataFiles first
@@ -155,12 +150,16 @@
new ByteArrayEncoding(filesToRename.toByteBuffer()
.array()));
- controlRecord.setFileID(-1);
+
+ HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
+
+ controlRecord.setFileID(0);
+
controlRecord.encode(renameBuffer);
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-
+
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
writeBuffer.rewind();
@@ -208,16 +207,18 @@
flush();
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
+
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
currentFile = journal.getFile(false, false, false, true);
+
sequentialFile = currentFile.getFile();
sequentialFile.open(1, false);
- fileID = nextOrderingID++;
- currentFile = new JournalFileImpl(sequentialFile, fileID);
- writingChannel.writeInt(fileID);
+ currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++);
+
+ JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
protected void addToRecordsSnaptshot(final long id)
@@ -235,7 +236,7 @@
protected void writeEncoder(final JournalInternalRecord record) throws Exception
{
- record.setFileID(fileID);
+ record.setFileID(currentFile.getRecordID());
record.encode(getWritingChannel());
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -55,7 +55,7 @@
protected JournalCleaner(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final Set<Long> recordsSnapshot,
- final int nextOrderingID) throws Exception
+ final long nextOrderingID) throws Exception
{
super(fileFactory, journal, recordsSnapshot, nextOrderingID);
openFile();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -69,7 +69,7 @@
if (controlFile.exists())
{
- JournalFile file = new JournalFileImpl(controlFile, -1);
+ JournalFile file = new JournalFileImpl(controlFile, 0);
final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -140,7 +140,7 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final Set<Long> recordsSnapshot,
- final int firstFileID)
+ final long firstFileID)
{
super(fileFactory, journal, recordsSnapshot, firstFileID);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -56,7 +56,11 @@
long getOffset();
- int getFileID();
+ /** This is a field to identify that records on this file actually belong to the current file.
+ * The possible implementation for this is fileID & Integer.MAX_VALUE */
+ int getRecordID();
+
+ long getFileID();
SequentialFile getFile();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -35,7 +35,9 @@
private final SequentialFile file;
- private final int fileID;
+ private final long fileID;
+
+ private final int recordID;
private long offset;
@@ -49,11 +51,13 @@
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
- public JournalFileImpl(final SequentialFile file, final int fileID)
+ public JournalFileImpl(final SequentialFile file, final long fileID)
{
this.file = file;
this.fileID = fileID;
+
+ this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
}
public void clearCounts()
@@ -132,10 +136,15 @@
return offset;
}
- public int getFileID()
+ public long getFileID()
{
return fileID;
}
+
+ public int getRecordID()
+ {
+ return recordID;
+ }
public void setOffset(final long offset)
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -69,7 +69,7 @@
/**
*
- * <p>A JournalImpl</p
+ * <p>A circular log implementation.</p
*
* <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
*
@@ -88,6 +88,8 @@
private static final int STATE_LOADED = 2;
+ private static final int FORMAT_VERSION = 1;
+
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -109,7 +111,8 @@
public static final int MIN_FILE_SIZE = 1024;
- public static final int SIZE_HEADER = DataConstants.SIZE_INT;
+ // FileID(Long) + JournalVersion + UserVersion
+ public static final int SIZE_HEADER = DataConstants.SIZE_LONG + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
public static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
@@ -163,8 +166,10 @@
private volatile boolean autoReclaim = true;
- private final AtomicInteger nextFileID = new AtomicInteger(0);
+ private final AtomicLong nextFileID = new AtomicLong(0);
+ private final int userVersion;
+
private final int maxAIO;
private final int fileSize;
@@ -220,65 +225,16 @@
// Constructors --------------------------------------------------
- public void runDirectJournalBlast() throws Exception
+ public JournalImpl(final int fileSize,
+ final int minFiles,
+ final int compactMinFiles,
+ final int compactPercentage,
+ final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int maxAIO)
{
- final int numIts = 100000000;
-
- JournalImpl.log.info("*** running direct journal blast: " + numIts);
-
- final CountDownLatch latch = new CountDownLatch(numIts * 2);
-
- class MyIOAsyncTask implements IOCompletion
- {
- public void done()
- {
- latch.countDown();
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
-
- }
-
- public void storeLineUp()
- {
- }
- }
-
- final MyIOAsyncTask task = new MyIOAsyncTask();
-
- final int recordSize = 1024;
-
- final byte[] bytes = new byte[recordSize];
-
- class MyRecord implements EncodingSupport
- {
-
- public void decode(final HornetQBuffer buffer)
- {
- }
-
- public void encode(final HornetQBuffer buffer)
- {
- buffer.writeBytes(bytes);
- }
-
- public int getEncodeSize()
- {
- return recordSize;
- }
-
- }
-
- MyRecord record = new MyRecord();
-
- for (int i = 0; i < numIts; i++)
- {
- appendAddRecord(i, (byte)1, record, true, task);
- appendDeleteRecord(i, true, task);
- }
-
- latch.await();
+ this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
}
public JournalImpl(final int fileSize,
@@ -288,7 +244,8 @@
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
- final int maxIO)
+ final int maxAIO,
+ final int userVersion)
{
if (fileFactory == null)
{
@@ -316,7 +273,7 @@
{
throw new NullPointerException("fileExtension is null");
}
- if (maxIO <= 0)
+ if (maxAIO <= 0)
{
throw new IllegalStateException("maxAIO should aways be a positive number");
}
@@ -347,9 +304,72 @@
this.fileExtension = fileExtension;
- maxAIO = maxIO;
+ this.maxAIO = maxAIO;
+
+ this.userVersion = userVersion;
}
+ public void runDirectJournalBlast() throws Exception
+ {
+ final int numIts = 100000000;
+
+ JournalImpl.log.info("*** running direct journal blast: " + numIts);
+
+ final CountDownLatch latch = new CountDownLatch(numIts * 2);
+
+ class MyIOAsyncTask implements IOCompletion
+ {
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+
+ }
+
+ public void storeLineUp()
+ {
+ }
+ }
+
+ final MyIOAsyncTask task = new MyIOAsyncTask();
+
+ final int recordSize = 1024;
+
+ final byte[] bytes = new byte[recordSize];
+
+ class MyRecord implements EncodingSupport
+ {
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeBytes(bytes);
+ }
+
+ public int getEncodeSize()
+ {
+ return recordSize;
+ }
+
+ }
+
+ MyRecord record = new MyRecord();
+
+ for (int i = 0; i < numIts; i++)
+ {
+ appendAddRecord(i, (byte)1, record, true, task);
+ appendDeleteRecord(i, true, task);
+ }
+
+ latch.await();
+ }
+
public Map<Long, JournalRecord> getRecords()
{
return records;
@@ -576,7 +596,7 @@
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
- if (readFileId != file.getFileID())
+ if (readFileId != file.getRecordID())
{
// If a file has damaged pendingTransactions, we make it a dataFile, and the
// next reclaiming will fix it
@@ -2442,6 +2462,11 @@
{
return maxAIO;
}
+
+ public int getUserVersion()
+ {
+ return userVersion;
+ }
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
@@ -2631,25 +2656,17 @@
// Discard the old JournalFile and set it with a new ID
private JournalFile reinitializeFile(final JournalFile file) throws Exception
{
- int newFileID = generateFileID();
+ long newFileID = generateFileID();
SequentialFile sf = file.getFile();
sf.open(1, false);
- sf.position(0);
+ int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
- ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
-
- bb.putInt(newFileID);
-
- bb.rewind();
-
- sf.writeDirect(bb, true);
-
JournalFile jf = new JournalFileImpl(sf, newFileID);
- sf.position(bb.limit());
+ sf.position(position);
sf.close();
@@ -2747,16 +2764,8 @@
file.open(1, false);
- ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+ long fileID = readFileHeader(file);
- file.read(bb);
-
- int fileID = bb.getInt();
-
- fileFactory.releaseBuffer(bb);
-
- bb = null;
-
if (nextFileID.get() < fileID)
{
nextFileID.set(fileID);
@@ -2784,6 +2793,68 @@
return orderedFiles;
}
+ /**
+ * @param file
+ * @return
+ * @throws Exception
+ */
+ private long readFileHeader(SequentialFile file) throws Exception
+ {
+ ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+
+ file.read(bb);
+
+ int journalVersion = bb.getInt();
+
+ int userVersion = bb.getInt();
+
+ long fileID = bb.getLong();
+
+ fileFactory.releaseBuffer(bb);
+
+ bb = null;
+ return fileID;
+ }
+
+ /**
+ * @param fileID
+ * @param sequentialFile
+ * @throws Exception
+ */
+ static int initFileHeader(final SequentialFileFactory fileFactory, final SequentialFile sequentialFile, final int userVersion, final long fileID) throws Exception
+ {
+ // We don't need to release buffers while writing.
+ ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
+
+ writeHeader(buffer, userVersion, fileID);
+
+ bb.rewind();
+
+ int bufferSize = bb.limit();
+
+ sequentialFile.position(0);
+
+ sequentialFile.writeDirect(bb, true);
+
+ return bufferSize;
+ }
+
+ /**
+ * @param buffer
+ * @param userVersion
+ * @param fileID
+ */
+ static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
+ {
+ buffer.writeInt(FORMAT_VERSION);
+
+ buffer.writeInt(userVersion);
+
+ buffer.writeLong(fileID);
+ }
+
/**
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
@@ -2867,7 +2938,7 @@
}
// Adding fileID
- encoder.setFileID(currentFile.getFileID());
+ encoder.setFileID(currentFile.getRecordID());
if (callback != null)
{
@@ -2903,10 +2974,10 @@
*/
private JournalFile createFile(final boolean keepOpened,
final boolean multiAIO,
- final boolean fill,
+ final boolean init,
final boolean tmpCompact) throws Exception
{
- int fileID = generateFileID();
+ long fileID = generateFileID();
String fileName;
@@ -2930,17 +3001,11 @@
sequentialFile.open(1, false);
- if (fill)
+ if (init)
{
sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
- ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
-
- bb.putInt(fileID);
-
- bb.rewind();
-
- sequentialFile.writeDirect(bb, true);
+ initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
}
long position = sequentialFile.position();
@@ -2979,7 +3044,7 @@
file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
}
- private int generateFileID()
+ private long generateFileID()
{
return nextFileID.incrementAndGet();
}
@@ -3098,7 +3163,7 @@
*/
JournalFile getFile(final boolean keepOpened,
final boolean multiAIO,
- final boolean fill,
+ final boolean initFile,
final boolean tmpCompactExtension) throws Exception
{
JournalFile nextOpenedFile = null;
@@ -3117,7 +3182,7 @@
if (nextOpenedFile == null)
{
- nextOpenedFile = createFile(keepOpened, multiAIO, fill, tmpCompactExtension);
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
}
else
{
@@ -3435,8 +3500,8 @@
{
public int compare(final JournalFile f1, final JournalFile f2)
{
- int id1 = f1.getFileID();
- int id2 = f2.getFileID();
+ long id1 = f1.getFileID();
+ long id2 = f2.getFileID();
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -533,8 +533,15 @@
public void runDirectJournalBlast() throws Exception
{
- // TODO Auto-generated method stub
+ localJournal.runDirectJournalBlast();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getUserVersion()
+ */
+ public int getUserVersion()
+ {
+ return localJournal.getUserVersion();
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -1078,8 +1078,14 @@
public void runDirectJournalBlast() throws Exception
{
- // TODO Auto-generated method stub
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getUserVersion()
+ */
+ public int getUserVersion()
+ {
+ return 0;
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -662,7 +662,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles1() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(8, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
getAlignment()), true);
createJournal();
startJournal();
@@ -701,7 +701,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles2() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(8, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
getAlignment()), true);
createJournal();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-04-29 16:21:28 UTC (rev 9185)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-04-29 23:01:30 UTC (rev 9186)
@@ -792,7 +792,7 @@
return 0;
}
- public int getFileID()
+ public long getFileID()
{
return 0;
}
@@ -977,5 +977,13 @@
this.needCleanup = needCleanup;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getRecordID()
+ */
+ public int getRecordID()
+ {
+ return 0;
+ }
}
}
14 years, 10 months
JBoss hornetq SVN: r9185 - branches/HnetQ_323_cn/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-04-29 12:21:28 -0400 (Thu, 29 Apr 2010)
New Revision: 9185
Modified:
branches/HnetQ_323_cn/docs/user-manual/zh/clusters.xml
Log:
done
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/clusters.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/clusters.xml 2010-04-29 14:06:37 UTC (rev 9184)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/clusters.xml 2010-04-29 16:21:28 UTC (rev 9185)
@@ -19,71 +19,50 @@
<!-- ============================================================================= -->
<chapter id="clusters">
- <title>Clusters</title>
+ <title>集群</title>
<section>
- <title>Clusters Overview</title>
- <para>HornetQ clusters allow groups of HornetQ servers to be grouped
- together in order to share message processing load. Each active node in the cluster is
- an active HornetQ server which manages its own messages and handles its own
- connections. A server must be configured to be clustered, you will need to set the
- <literal>clustered</literal> element in the <literal>hornetq-configuration.xml</literal>
- configuration file to <literal>true</literal>, this is <literal>false</literal> by
- default.</para>
- <para>The cluster is formed by each node declaring <emphasis>cluster connections</emphasis>
- to other nodes in the core configuration file <literal>hornetq-configuration.xml</literal>.
- When a node forms a cluster connection to another node, internally it creates a <emphasis>core
- bridge</emphasis> (as described in <xref
- linkend="core-bridges" />) connection between it and the other node,
- this is done transparently behind the scenes - you don't have to declare an explicit
- bridge for each node. These cluster connections allow messages to flow between the nodes
- of the cluster to balance load.</para>
- <para>Nodes can be connected together to form a cluster in many different topologies, we
- will discuss a couple of the more common topologies later in this chapter.</para>
- <para>We'll also discuss client side load balancing, where we can balance client connections
- across the nodes of the cluster, and we'll consider message redistribution where HornetQ
- will redistribute messages between nodes to avoid starvation.</para>
- <para>Another important part of clustering is <emphasis>server discovery</emphasis> where
- servers can broadcast their connection details so clients or other servers can connect
- to them with the minimum of configuration.</para>
+ <title>集群概述</title>
+ <para>HornetQ集群是由一组HornetQ服务器组成的集合,它们协同合作进行消息处理。集群中每个主节点就是一个
+ HornetQ服务器,它管理自己的连接并处理自己的消息。要将一个HornetQ服务器配置为集群服务器,需要将配置
+ 文件<literal>hornetq-configuration.xml</literal>中<literal>clustered</literal>的值设
+ 为<literal>true</literal>。默认值是<literal>false</literal>。</para>
+ <para>要组成一个集群,每个节点都要在其核心配置文件<literal>hornetq-configuration.xml</literal>
+ 中声明<emphasis>集群连接</emphasis>,用来建立与集群中其它节点的通迅。每两个节点间都是通过内部的一个
+ <emphasis>核心桥</emphasis>(参见<xref linkend="core-bridges" />)连接的。这些连接的建立是
+ 透明的--你不需要为每个连接显式地声明一个桥。集群连接的作用是在集群的各个节点间进行负载平衡。</para>
+ <para>HornetQ可以采用不同的拓扑结构来组成集群。本章后面将讲述几种常用的拓扑结构。</para>
+ <para>我们还将讨论客户端的负载均衡--客户端如何均衡其与集群各节点的连接,以及消息的再分配--在节点间合理
+ 的分配消息以避免消息匮乏(starvation)。</para>
+ <para>本章还涉及集群的另一个重要方面--<emphasis>服务器发现</emphasis>,即服务器通过广播的方式将
+ 自己的连接信息告诉客户端或其它服务器,以使它们能与其建立连接,不需要额外的配置。</para>
</section>
<section id="clusters.server-discovery">
- <title>Server discovery</title>
- <para>Server discovery is a mechanism by which servers can broadcast their connection
- settings across the network. This is useful for two purposes:</para>
+ <title>服务器发现</title>
+ <para>服务器发现是指服务器通过广播的方式将自己的连接设置发送到网络上的机制,它有两个目的:</para>
<itemizedlist>
<listitem>
- <para>Discovery by messaging clients. A messaging client wants to be able to connect
- to the servers of the cluster without having specific knowledge of which servers
- in the cluster are up at any one time. Messaging clients
- <emphasis>can</emphasis> be initialised with an explicit list of the servers
- in a cluster, but this is not flexible or maintainable as servers are added or
- removed from the cluster.</para>
+ <para>被消息客户端发现。客户端接到广播后可以知道集群中有哪些服务器处于工作状态以及如何与它们
+ 建立连接。虽然客户端<emphasis>可以</emphasis>可以在初始化时接受一个集群服务器的列表,
+ 但是这样做与广播方式相比不够灵活。比如集群中有服务器离开或新加入时,列表的方式不能及时更新这些信息。</para>
</listitem>
<listitem>
- <para>Discovery by other servers. Servers in a cluster want to be able to create
- cluster connections to each other without having prior knowledge of all the
- other servers in the cluster. </para>
+ <para>被其它服务器发现。通过广播,服务器之间可以自动建立彼此间的连接,不需要事先知道集群中其它
+ 服务器的信息。</para>
</listitem>
</itemizedlist>
- <para>Server discovery uses <ulink url="http://en.wikipedia.org/wiki/User_Datagram_Protocol"
- >UDP</ulink> multicast to broadcast server connection settings. If UDP is disabled
- on your network you won't be able to use this, and will have to specify servers
- explicitly when setting up a cluster or using a messaging client.</para>
+ <para>服务器发现使用<ulink url="http://en.wikipedia.org/wiki/User_Datagram_Protocol"
+ >UDP</ulink>协议来广播连接设置。如果网络中UDP被关闭,则不能使用服务器发现功能。只有用显式
+ 地指定服务器的方法来设置集群或集群的客户端。</para>
<section id="clusters.broadcast-groups">
- <title>Broadcast Groups</title>
- <para>A broadcast group is the means by which a server broadcasts connectors over the
- network. A connector defines a way in which a client (or other server) can make
- connections to the server. For more information on what a connector is, please see
- <xref linkend="configuring-transports" />.</para>
- <para>The broadcast group takes a set of connector pairs, each connector pair contains
- connection settings for a live and (optional) backup server and broadcasts them on
- the network. It also defines the UDP address and port settings. </para>
- <para>Broadcast groups are defined in the server configuration file <literal
- >hornetq-configuration.xml</literal>. There can be many broadcast groups per HornetQ
- server. All broadcast groups must be defined in a <literal
- >broadcast-groups</literal> element.</para>
- <para>Let's take a look at an example broadcast group from <literal
- >hornetq-configuration.xml</literal>:</para>
+ <title>广播组</title>
+ <para>服务器以广播组的方式来广播它的连接器信息。连接器定义了如何与该服务器建立连接的信息。关于连接器更多的
+ 解释,请参见<xref linkend="configuring-transports" />。</para>
+ <para>广播组包括了一系列的连接器对。每个连接器对由主服务器的连接器和备份(可选)服务器连接器信息组成。
+ 广播组还定义了所使用的UDP的在址和端口信息。</para>
+ <para>广播组的配置中服务器配置文件<literal
+ >hornetq-configuration.xml</literal>中。一个HornetQ服务器可以有多个广播组。所有的广播组
+ 必需定义在<literal>broadcast-groups</literal>内。</para>
+ <para>让我们来看一个<literal>hornetq-configuration.xml</literal>文件中广播组的例子:</para>
<programlisting><broadcast-groups>
<broadcast-group name="my-broadcast-group">
<local-bind-port>54321</local-bind-port>
@@ -91,87 +70,65 @@
<group-port>9876</group-port>
<broadcast-period>1000</broadcast-period>
<connector-ref connector-name="netty-connector"
- backup-connector-name="backup-connector"/>
+ backup-connector="backup-connector"/>
</broadcast-group>
</broadcast-groups></programlisting>
- <para>Some of the broadcast group parameters are optional and you'll normally use the
- defaults, but we specify them all in the above example for clarity. Let's discuss
- each one in turn:</para>
+ <para>有些广播组的参数是可选的,通常情况下可以使用默认值。在上面例子中我们为了说明目的给出了这些参数。
+ 下面是这些参数的说明:</para>
<itemizedlist>
<listitem>
- <para><literal>name</literal> attribute. Each broadcast group in the server must
- have a unique name. </para>
+ <para><literal>name</literal>。每个广播组需要有一个唯一的名字。</para>
</listitem>
<listitem>
- <para><literal>local-bind-address</literal>. This is the local bind
- address that the datagram socket is bound to. If you have multiple network
- interfaces on your server, you would specify which one you wish to use for
- broadcasts by setting this property. If this property is not specified then
- the socket will be bound to the wildcard address, an IP address chosen by
- the kernel.</para>
+ <para><literal>local-bind-address</literal>。这个参数是套接字的本地绑定地址。如果在服务器
+ 中有多个网络接口卡时,必须要指定使用的是哪个接口。如果这个参数没有指定,那么将使用系统内核
+ 所选定的IP地址。</para>
</listitem>
<listitem>
- <para><literal>local-bind-port</literal>. If you want to specify a local port to
- which the datagram socket is bound you can specify it here. Normally you
- would just use the default value of <literal>-1</literal> which signifies
- that an anonymous port should be used.</para>
+ <para><literal>local-bind-port</literal>。这个参数指定了套接字的本地绑定端口。通常情况下
+ 可以使用其默认值<literal>-1</literal>,表示使用随机的端口。</para>
</listitem>
<listitem>
- <para><literal>group-address</literal>. This is the multicast address to which
- the data will be broadcast. It is a class D IP address in the range <literal
- >224.0.0.0</literal> to <literal>239.255.255.255</literal>, inclusive.
- The address <literal>224.0.0.0</literal> is reserved and is not available
- for use. This parameter is mandatory.</para>
+ <para><literal>group-address</literal>。这个参数指定的是广播地址。它是一个D类的IP地址,
+ 取值范围是<literal>224.0.0.0</literal>到<literal>239.255.255.255</literal>。
+ 地址<literal>224.0.0.0</literal>是保留地址,所以不能使用。这个参数是必需指定。</para>
</listitem>
<listitem>
- <para><literal>group-port</literal>. This is the UDP port number used for
- broadcasting. This parameter is mandatory.</para>
+ <para><literal>group-port</literal>。这个参数设定广播的UDP端口。
+ 是一个必需指定的参数。</para>
</listitem>
<listitem>
- <para><literal>broadcast-period</literal>. This is the period in milliseconds
- between consecutive broadcasts. This parameter is optional, the default
- value is <literal>1000</literal> milliseconds.</para>
+ <para><literal>broadcast-period</literal>。指定两次广播之间的时间间隔,单位毫秒。
+ 这是一个可选参数,它的默认值是<literal>1000</literal>毫秒。</para>
</listitem>
<listitem>
- <para><literal>connector-ref</literal>. This specifies the connector and
- optional backup connector that will be broadcasted (see <xref
- linkend="configuring-transports" /> for more information on
- connectors). The connector to be broadcasted is specified by the <literal
- >connector-name</literal> attribute, and the backup connector to be
- broadcasted is specified by the <literal>backup-connector</literal> attribute.
- The <literal>backup-connector</literal> attribute is optional.</para>
+ <para><literal>connector-ref</literal>。这个参数指定了要广播的连接器以及可选的备份连接器。
+ (参见<xref linkend="configuring-transports" />)。
+ <literal>connector-name</literal>属性的值是连接器的名字,
+ <literal>backup-connector</literal>属性是备份连接器的名字,是可选属性。</para>
</listitem>
</itemizedlist>
</section>
<section id="clusters.discovery-groups">
- <title>Discovery Groups</title>
- <para>While the broadcast group defines how connector information is broadcasted from a
- server, a discovery group defines how connector information is received from a
- multicast address.</para>
- <para>A discovery group maintains a list of connector pairs - one for each broadcast by
- a different server. As it receives broadcasts on the multicast group address from a
- particular server it updates its entry in the list for that server.</para>
- <para>If it has not received a broadcast from a particular server for a length of time
- it will remove that server's entry from its list.</para>
- <para>Discovery groups are used in two places in HornetQ:</para>
+ <title>发现组</title>
+ <para>广播组规定了如何广播连接器的信息,发现组则定义的如何接收连接器的信息。</para>
+ <para>一个发现组包括了一系列的连接器对--每个连接器对代表一个不同的服务器广播的连接器信息。每当接收一次广播,
+ 这个连接对的列表就被更新一次。</para>
+ <para>如果在一定时间内没有收到某个服务器的广播,则其相应的连接器对将从列表中删除。</para>
+ <para>发现组在HornetQ中有两处应用:</para>
<itemizedlist>
<listitem>
- <para>By cluster connections so they know what other servers in the cluster they
- should make connections to.</para>
+ <para>在创建集群连接时用来判断集群中哪些服务器是可以连接的。</para>
</listitem>
<listitem>
- <para>By messaging clients so they can discovery what servers in the cluster
- they can connect to.</para>
+ <para>客户端用来发现哪些服务器可以连接。</para>
</listitem>
</itemizedlist>
</section>
<section>
- <title>Defining Discovery Groups on the Server</title>
- <para>For cluster connections, discovery groups are defined in the server side
- configuration file <literal>hornetq-configuration.xml</literal>. All discovery groups
- must be defined inside a <literal>discovery-groups</literal> element. There can be
- many discovery groups defined by HornetQ server. Let's look at an
- example:</para>
+ <title>在服务器端定义发现组。</title>
+ <para>服务器端的发现组定义在<literal>hornetq-configuration.xml</literal>配置文件中。所有的发现组都必须
+ 在<literal>discovery-groups</literal>内定义。发现组可以定义多个。请看下面的例子:</para>
<programlisting><discovery-groups>
<discovery-group name="my-discovery-group">
<group-address>231.7.7.7</group-address>
@@ -179,65 +136,47 @@
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups></programlisting>
- <para>We'll consider each parameter of the discovery group:</para>
+ <para>下面是对每个参数的解释:</para>
<itemizedlist>
<listitem>
- <para><literal>name</literal> attribute. Each discovery group must have a unique
- name per server.</para>
+ <para><literal>name</literal>属性。每个发现组都必须有一个唯一的名字。</para>
</listitem>
<listitem>
- <para><literal>group-address</literal>. This is the multicast ip address of the
- group to listen on. It should match the <literal>group-address</literal> in
- the broadcast group that you wish to listen from. This parameter is
- mandatory.</para>
+ <para><literal>group-address</literal>。需要监听的广播地址。它需要与广播组的
+ <literal>group-address</literal>一致才可以收到广播组的信息。这是一个必要参数。</para>
</listitem>
<listitem>
- <para><literal>group-port</literal>. This is the UDP port of the multicast
- group. It should match the <literal>group-port</literal> in the broadcast
- group that you wish to listen from. This parameter is mandatory.</para>
+ <para><literal>group-port</literal>。需要监听的UDP端口。它需要与广播组的
+ <literal>group-port</literal>值相同才可以收到广播组的信息。这是一个必要参数。</para>
</listitem>
<listitem>
- <para><literal>refresh-timeout</literal>. This is the period the discovery group
- waits after receiving the last broadcast from a particular server before
- removing that servers connector pair entry from its list. You would normally
- set this to a value significantly higher than the <literal
- >broadcast-period</literal> on the broadcast group otherwise servers
- might intermittently disappear from the list even though they are still
- broadcasting due to slight differences in timing. This parameter is
- optional, the default value is <literal>10000</literal> milliseconds (10
- seconds).</para>
+ <para><literal>refresh-timeout</literal>。这个参数决定了在收到某个服务器的广播后,需要等待
+ 多长时间下一次广播必须收到,否则将该服务器的连接器对从列表中删除。通常这个参数的值应该远大于
+ 广播组的<literal>broadcast-period</literal>,否则会使服务器信息由于小的时间差异而丢失。
+ 这个参数是可选的,它的默认值是<literal>10000</literal>毫秒(10秒)。</para>
</listitem>
</itemizedlist>
</section>
<section id="clusters-discovery.groups.clientside">
- <title>Discovery Groups on the Client Side</title>
- <para>Let's discuss how to configure a HornetQ client to use discovery to
- discover a list of servers to which it can connect. The way to do this differs
- depending on whether you're using JMS or the core API.</para>
+ <title>客户端的发现组</title>
+ <para>现在讨论如何配置HornetQ客户端来发现可以连接的服务器列表。使用JMS时所用的方法与使用核心接口时所用的
+ 方法有所不同。</para>
<section>
- <title>Configuring client discovery using JMS</title>
- <para>If you're using JMS and you're also using the JMS Service on the server to
- load your JMS connection factory instances into JNDI, then you can specify which
- discovery group to use for your JMS connection factory in the server side xml
- configuration <literal>hornetq-jms.xml</literal>. Let's take a look at an
- example:</para>
+ <title>使用JMS时客户端发现的配置方法</title>
+ <para>如果使用JMS,并且在服务器端的JMS连接工厂是注册到JNDI的情况下,你可以在服务器端的配置文件
+ <literal>hornetq-jms.xml</literal>中指定连接工厂所用的发现组。如下面所示:</para>
<programlisting><connection-factory name="ConnectionFactory">
<discovery-group-ref discovery-group-name="my-discovery-group"/>
<entries>
<entry name="ConnectionFactory"/>
</entries>
</connection-factory></programlisting>
- <para>The element <literal>discovery-group-ref</literal> specifies the name of a
- discovery group defined in <literal>hornetq-configuration.xml</literal>.</para>
- <para>When this connection factory is downloaded from JNDI by a client application
- and JMS connections are created from it, those connections will be load-balanced
- across the list of servers that the discovery group maintains by listening on
- the multicast address specified in the discovery group configuration.</para>
- <para>If you're using JMS, but you're not using JNDI to lookup a connection factory
- - you're instantiating the JMS connection factory directly then you can specify
- the discovery group parameters directly when creating the JMS connection
- factory. Here's an
- example:<programlisting>final String groupAddress = "231.7.7.7";
+ <para>其中<literal>discovery-group-ref</literal>的值是定义在
+ <literal>hornetq-configuration.xml</literal>文件中的一个发现组。</para>
+ <para>当连接工厂从JNDI下载到客户端时,使用它创建连接就会在列表中的服务器间进行负载均衡。
+ 客户端通过监听发现组中的广播地址可以不断更新这个服务器列表。</para>
+ <para>如果使用JMS但是不用JNDI,而是直接实例化JMS的连接工厂的话,可以用适当的方法来设置发现组的各个
+ 参数。如下所示:<programlisting>final String groupAddress = "231.7.7.7";
final int groupPort = 9876;
@@ -247,23 +186,17 @@
Connection jmsConnection1 = jmsConnectionFactory.createConnection();
Connection jmsConnection2 = jmsConnectionFactory.createConnection();</programlisting></para>
- <para>The <literal>refresh-timeout</literal> can be set directly on the connection
- factory by using the setter method <literal>setDiscoveryRefreshTimeout() if you
- want to change the default value.</literal></para>
- <para>There is also a further parameter settable on the connection factory using the
- setter method <literal>setInitialWaitTimeout()</literal>. If the connection
- factory is used immediately after creation then it may not have had enough time
- to received broadcasts from all the nodes in the cluster. On first usage, the
- connection factory will make sure it waits this long since creation before
- creating the first connection. The default value for this parameter is <literal
- >2000</literal> milliseconds.</para>
+ <para><literal>refresh-timeout</literal>参数可以直接在连接工厂上使用
+ <literal>setDiscoveryRefreshTimeout()</literal>方法设置。</literal></para>
+ <para>连接工厂还有一个方法<literal>setInitialWaitTimeout()</literal>。它可以设置连接工厂的
+ 初始等待时间。当一个连接工厂被创建后立即进行用于创建连接的话,连接工厂可能没有足够的时间来接收各
+ 个服务器发出的广播信息,也就无法建立完整的服务器列表。有了这个参数,连接工厂会在首次创建连接时
+ 等待一定的时间,以接收广播。默认值是<literal>2000</literal>毫秒。</para>
</section>
<section>
- <title>Configuring client discovery using Core</title>
- <para>If you're using the core API to directly instantiate <literal
- >ClientSessionFactory</literal> instances, then you can specify the
- discovery group parameters directly when creating the session factory. Here's an
- example:
+ <title>使用核心API的客户端的配置</title>
+ <para>如果使用核心接口直接创建<literal>ClientSessionFactory</literal>的实例,可以使用相应的方法
+ 直接进行参数的设置,如:
<programlisting>
final String groupAddress = "231.7.7.7";
final int groupPort = 9876;
@@ -273,54 +206,37 @@
</programlisting>
</para>
- <para>The <literal>refresh-timeout</literal> can be set directly on the session
- factory by using the setter method <literal>setDiscoveryRefreshTimeout() if you
- want to change the default value.</literal></para>
- <para>There is also a further parameter settable on the session factory using the
- setter method <literal>setInitialWaitTimeout()</literal>. If the session factory
- is used immediately after creation then it may not have had enough time to
- received broadcasts from all the nodes in the cluster. On first usage, the
- session factory will make sure it waits this long since creation before creating
- the first session. The default value for this parameter is <literal
- >2000</literal> milliseconds.</para>
+ <para>方法<literal>setDiscoveryRefreshTimeout()</literal>可以用来直接设置参数
+ <literal>refresh-timeout</literal>。</para>
+ <para>会话工厂还有一个方法<literal>setInitialWaitTimeout()</literal>。它可以设置会话工厂的
+ 初始等待时间。当一个会话工厂被创建后立即进行用于创建连接的话,该会话工厂可能没有足够的时间来接收各
+ 个服务器发出的广播信息,也就无法建立完整的服务器列表。有了这个参数,会话工厂会在首次创建连接时
+ 等待一定的时间,以接收广播。默认值是<literal>2000</literal>毫秒。</para>
</section>
</section>
</section>
<section>
- <title>Server-Side Message Load Balancing</title>
- <para>If cluster connections are defined between nodes of a cluster, then HornetQ
- will load balance messages arriving at a particular node from a client.</para>
- <para>Let's take a simple example of a cluster of four nodes A, B, C, and D arranged in a
- <emphasis>symmetric cluster</emphasis> (described in <xref linkend="symmetric-cluster" />).
- We have a queue called
- <literal>OrderQueue</literal> deployed on each node of the cluster.</para>
- <para>We have client Ca connected to node A, sending orders to the server. We have also have
- order processor clients Pa, Pb, Pc, and Pd connected to each of the nodes A, B, C, D. If
- no cluster connection was defined on node A, then as order messages arrive on node A
- they will all end up in the <literal>OrderQueue</literal> on node A, so will only get
- consumed by the order processor client attached to node A, Pa.</para>
- <para>If we define a cluster connection on node A, then as ordered messages arrive on node A
- instead of all of them going into the local <literal>OrderQueue</literal> instance, they
- are distributed in a round-robin fashion between all the nodes of the cluster. The
- messages are forwarded from the receiving node to other nodes of the cluster. This is
- all done on the server side, the client maintains a single connection to node A.</para>
- <para>For example, messages arriving on node A might be distributed in the following order
- between the nodes: B, D, C, A, B, D, C, A, B, D. The exact order depends on the order
- the nodes started up, but the algorithm used is round robin.</para>
- <para>HornetQ cluster connections can be configured to always blindly load balance
- messages in a round robin fashion irrespective of whether there are any matching
- consumers on other nodes, but they can be a bit cleverer than that and also be
- configured to only distribute to other nodes if they have matching consumers. We'll look
- at both these cases in turn with some examples, but first we'll discuss configuring
- cluster connections in general.</para>
+ <title>服务器端消息的负载均衡</title>
+ <para>如果集群和各节点间定义了集群连接,HornetQ可以对到达一个节点的消息进行负载均衡。</para>
+ <para>举一个简单的例子。一个集群有4个节点,分别称为节点A、B、C和节点D。它们组成了一个
+ <emphasis>对称式集群</emphasis>(有关对称式集群参见<xref linkend="symmetric-cluster" />)。
+ 在每个节点上部署了一个名为<literal>OrderQueue</literal>的队列。</para>
+ <para>一个客户端Ca连接到节点A并向其发送订单消息。客户端Pa、Pb、Pc和Pd分别连接到节点A、B、C和D并接收处理
+ 这些订单消息。如果在节点A中没有定义集群连接,那么订单消息都发送到节点A中的队列<literal>OrderQueue</literal>
+ 中。因此只有连接到节点A的客户端Pa才能接收到订单消息。</para>
+ <para>如果在节点A定义了集群连接的话,发送到节点A的消息被轮流(round-robin)从节点A分配到各个节点上的
+ <literal>OrderQueue</literal>队列中。这种消息分配完全在服务器端完成,客户端只向节点A发送消息。</para>
+ <para>例如到达节点A的消息可能以下列顺序进行分配:B、D、C、A、B、D、C、A、B、D。具体的顺序取决于节点启动的
+ 先后,但是其算法是不变的(即round-robin)。</para>
+ <para>HornetQ集群连接在进行消息负载均衡时,可以配置成统一负载均衡模式,即不管各个节点上有无合适的接收者,一律在
+ 所有节点间进行消息的分配。也可以配置成为智能负载均衡模式,即只将消息分配到有合适接收者的节点上。这两种模式我们
+ 都将举例说明。首先我们先介绍一般的集群连接配置。</para>
<section id="clusters.cluster-connections">
- <title>Configuring Cluster Connections</title>
- <para>Cluster connections group servers into clusters so that messages can be load
- balanced between the nodes of the cluster. Let's take a look at a typical cluster
- connection. Cluster connections are always defined in <literal
- >hornetq-configuration.xml</literal> inside a <literal>cluster-connection</literal>
- element. There can be zero or more cluster connections defined per HornetQ
- server.</para>
+ <title>配置集群连接</title>
+ <para>集群连接将一组服务器连接成为一个集群,消息可以在集群的节点之间进行负载均衡。集群连接的配置在
+ <literal>hornetq-configuration.xml</literal>文件中的
+ <literal>cluster-connection</literal>内。一个HornetQ服务器可以有零个或多个集群连接。
+ 下面是一个典型的例子:</para>
<programlisting>
<cluster-connections>
<cluster-connection name="my-cluster">
@@ -333,140 +249,96 @@
</cluster-connection>
</cluster-connections>
</programlisting>
- <para>In the above cluster connection all parameters have been explicitly specified. In
- practice you might use the defaults for some.</para>
+ <para>上面给出了集群连接的所有可配置参数。在实际应用中有些你可以使用默认值,不必全部给出。</para>
<itemizedlist>
<listitem>
- <para><literal>address</literal>. Each cluster connection only applies to
- messages sent to an address that starts with this value.</para>
- <para>In this case, this cluster connection will load balance messages sent to
- address that start with <literal>jms</literal>. This cluster connection,
- will, in effect apply to all JMS queue and topic subscriptions since they
- map to core queues that start with the substring "jms".</para>
- <para>The address can be any value and you can have many cluster connections
- with different values of <literal>address</literal>, simultaneously
- balancing messages for those addresses, potentially to different clusters of
- servers. By having multiple cluster connections on different addresses a
- single HornetQ Server can effectively take part in multiple clusters
- simultaneously.</para>
- <para>By careful not to have multiple cluster connections with overlapping
- values of <literal>address</literal>, e.g. "europe" and "europe.news" since
- this could result in the same messages being distributed between more than
- one cluster connection, possibly resulting in duplicate deliveries. </para>
- <para>This parameter is mandatory.</para>
+ <para><literal>address</literal>。每个集群连接只服务于发送到以这个参数的值为开始的
+ 地址的消息。</para>
+ <para>本例中的集群连接只对发往以<literal>jms</literal>为开始的地址的消息进行负载均衡的
+ 处理。这个集群连接实际上能够处理所有JMS队列和话题的订阅中的消息,这是国为所有JMS的队列
+ 或订阅都映射到内核中以“jms“开头的队列。</para>
+ <para>这个地址可以为任何值,而且可以配置多个集群连接,每个连接的地址值可以不同。这样HornetQ
+ 可以同时对不同地址同时进行消息的负载均衡。有的地址甚至可能在其它集群的节点中。这也就意谓着
+ 一个HornetQ服务器可以同时参与到多个集群中。</para>
+ <para>要注意别造成多个集群连接的地址互相重复。比如,地址“europe“和”europe.news“就互相重复,
+ 就会造成同一个消息会被多个集群连接进行分配,这样有可能发生重复传递。</para>
+ <para>本参数是必须指定的。</para>
</listitem>
<listitem>
- <para><literal>retry-interval</literal>. We mentioned before that, internally,
- cluster connections cause bridges to be created between the nodes of the
- cluster. If the cluster connection is created and the target node has not
- been started, or say, is being rebooted, then the cluster connections from
- other nodes will retry connecting to the target until it comes back up, in
- the same way as a bridge does.</para>
- <para>This parameter determines the interval in milliseconds between retry
- attempts. It has the same meaning as the <literal>retry-interval</literal>
- on a bridge (as described in <xref linkend="core-bridges" />).</para>
- <para>This parameter is optional and its default value is <literal>500</literal>
- milliseconds.</para>
+ <para><literal>retry-interval</literal>。如前所述,一个集群连接实际上在内部是用桥将两
+ 个节点连接起来。如果集群连接已经创建但是目的节点还未启动,或正在重启,这时集群连接就会不断
+ 重试与这个节点的连接,直到节点启动完毕连接成功为止。</para>
+ <para>这个参数决定了两次重试之间的时间间隔,单位是毫秒。它与桥的参数<literal>retry-interval</literal>
+ 的含义相同(参见<xref linkend="core-bridges" />)。</para>
+ <para>这个参数是可选的,默认值是<literal>500</literal>毫秒。</para>
</listitem>
<listitem>
- <para><literal>use-duplicate-detection</literal>. Internally cluster connections
- use bridges to link the nodes, and bridges can be configured to add a
- duplicate id property in each message that is forwarded. If the target node
- of the bridge crashes and then recovers, messages might be resent from the
- source node. By enabling duplicate detection any duplicate messages will be
- filtered out and ignored on receipt at the target node.</para>
- <para>This parameter has the same meaning as <literal
- >use-duplicate-detection</literal> on a bridge. For more information on
- duplicate detection, please see <xref linkend="duplicate-detection"
- />.</para>
- <para>This parameter is optional and has a default value of <literal
- >true</literal>.</para>
+ <para><literal>use-duplicate-detection</literal>。集群连接使用桥来连接各节点,而桥可以
+ 通过配置向每个转发的消息添加一个重复id的属性。如果目的节点崩溃并重启,消息可以被重新发送。
+ 重复检测的功能就是在这种情况下将重复发送的消息进行过滤并丢弃。</para>
+ <para>这个参数与桥的参数<literal
+ >use-duplicate-detection</literal>相同。关于重复检测的更多信息,请参见
+ <xref linkend="duplicate-detection"/>。</para>
+ <para>这参数是可选的,默认值是<literal>true</literal>。</para>
</listitem>
<listitem>
- <para><literal>forward-when-no-consumers</literal>. This parameter determines
- whether messages will be distributed round robin between other nodes of the
- cluster <emphasis>irrespective</emphasis> of whether there are matching or
- indeed any consumers on other nodes. </para>
- <para>If this is set to <literal>true</literal> then each incoming message will
- be round robin'd even though the same queues on the other nodes of the
- cluster may have no consumers at all, or they may have consumers that have
- non matching message filters (selectors). Note that HornetQ will
- <emphasis>not</emphasis> forward messages to other nodes if there are no
- <emphasis>queues</emphasis> of the same name on the other nodes, even if
- this parameter is set to <literal>true</literal>.</para>
- <para>If this is set to <literal>false</literal> then HornetQ will only
- forward messages to other nodes of the cluster if the address to which they
- are being forwarded has queues which have consumers, and if those consumers
- have message filters (selectors) at least one of those selectors must match
- the message.</para>
- <para>This parameter is optional, the default value is <literal
- >false</literal>.</para>
+ <para><literal>forward-when-no-consumers</literal>。这个参数决定了是否向没有合适接收者
+ 的节点分配消息。即不管有没有合适的接收者,消息在所有的节点间轮流分配。</para>
+ <para>如果这个参数设为<literal>true</literal>,则消息就会轮流在每个节点间分配,不管是否
+ 节点上有没有相应的接收者(或者有接收者但是具有不匹配的选择器)。注意,如果其它节点中没有
+ 与本节点同名的队列,HornetQ不会将消息转发到那些节点中去,不受本参数的限制。</para>
+ <para>如果参数设为<literal>false</literal>, HornetQ中将消息转发到集群中那些有着适合接收者
+ 的节点中。如果接收者有选择器,则至少有一个选择器与所转发的消息匹配才可,否则不转发。</para>
+ <para>本参数是可选的,默认值是<literal>false</literal>。</para>
</listitem>
<listitem>
- <para><literal>max-hops</literal>. When a cluster connection decides the set of
- nodes to which it might load balance a message, those nodes do not have to
- be directly connected to it via a cluster connection. HornetQ can be
- configured to also load balance messages to nodes which might be connected
- to it only indirectly with other HornetQ servers as intermediates in
- a chain.</para>
- <para>This allows HornetQ to be configured in more complex topologies
- and still provide message load balancing. We'll discuss this more later in
- this chapter.</para>
- <para>The default value for this parameter is <literal>1</literal>, which means
- messages are only load balanced to other HornetQ serves which are
- directly connected to this server. This parameter is optional.</para>
+ <para><literal>max-hops</literal>。当一个集群连接在确定进行消息负载均衡的节点组时,这些
+ 节点不一定是与本节点直接相连的节点。HornetQ可以通过其它HornetQ节点作为中介向那些非直接相
+ 连的节点转发消息。</para>
+ <para>这样可以使HornetQ组成更加复杂的拓扑结构并且仍可提供消息的负载均衡。在本章的后面我们还要作
+ 进一步的讨论。</para>
+ <para>本参数是可选参数,它的默认值是 <literal>1</literal>,表示消息只向直接相连的节点进行负载均衡。</para>
</listitem>
<listitem>
- <para><literal>discovery-group-ref</literal>. This parameter determines which
- discovery group is used to obtain the list of other servers in the cluster
- that this cluster connection will make connections to.</para>
+ <para><literal>discovery-group-ref</literal>。这个参数决定了使用哪个发现组来获得集群服务器的列表。
+ 集群连接与列表中的服务器建立连接。</para>
</listitem>
</itemizedlist>
</section>
<section id="clusters.clusteruser">
- <title>Cluster User Credentials</title>
+ <title>集群用户的安全信息</title>
- <para>When creating connections between nodes of a cluster to form a cluster connection,
- HornetQ uses a cluster user and cluster password which is defined in <literal>hornetq-configuration.xml</literal>:</para>
+ <para>当集群中两个节点建立连接时,HornetQ使用一个集群用户和集群密码。它们定义在
+ <literal>hornetq-configuration.xml</literal>文件中:</para>
<programlisting>
<cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>CHANGE ME!!</cluster-password>
</programlisting>
- <warning><para>It is imperative that these values are changed from their default, or remote clients will be able to make connections
- to the server using the default values. If they are not
- changed from the default, HornetQ will detect this and pester you with a warning on every
- start-up.</para></warning>
+ <warning><para>强烈建议在实际应用中不要使用默认的值,否则任意远程客户端会使用这些默认值连接到服务器上。当使用默认值时,
+ HornetQ会检测到并在每次启动的时候给出警告。</para></warning>
</section>
</section>
<section id="clusters.client.loadbalancing">
- <title>Client-Side Load balancing</title>
- <para>With HornetQ client-side load balancing, subsequent
- sessions created using a single session factory can be connected to different nodes of the
- cluster. This allows sessions to spread smoothly across the nodes of a cluster and
- not be "clumped" on any particular node.</para>
- <para>The load balancing policy to be used by the client factory is configurable. HornetQ
- provides two out-of-the-box load balancing policies and you can also implement
- your own and use that.</para>
- <para>The out-of-the-box policies are</para>
+ <title>客户端负载均衡</title>
+ <para>HornetQ的客户端负载均衡使同一个会话工厂每次创建一个会话时,都连接到集群不同的节点上。这样可以使所的有会话
+ 均匀分布在集群的各个节点上,而不会‘拥挤’到某一个节点上。</para>
+ <para>客户端负载均衡的策略是可配置的。HornetQ提供两种现成的负载均衡策略。你也可以实现自己的策略。</para>
+ <para>两种现成的策略是:</para>
<itemizedlist>
<listitem>
- <para>Round Robin. With this policy the first node is chosen randomly then each
- subsequent node is chosen sequentially in the same order.</para>
- <para>For example nodes might be chosen in the order B, C, D, A, B, C, D, A, B or D,
- A, B, C, A, B, C, D, A or C, D, A, B, C, D, A, B, C, D, A.</para>
+ <para>轮流策略(Round Robin)。这个策略是先随机选择一个节点作为第一个节点,然后依次选择各个节点。</para>
+ <para>例如一个顺序可能是 B, C, D, A, B, C, D, A, B,另一个也可能是 D,
+ A, B, C, D,A, B, C, D, A 或者 C, D, A, B, C, D, A, B, C, D, A等等。</para>
</listitem>
<listitem>
- <para>Random. With this policy each node is chosen randomly.</para>
+ <para>随机策略。每次都是随机选择一个节点来建立会话。</para>
</listitem>
</itemizedlist>
- <para>You can also implement your own policy by implementing the interface <literal
- >org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy</literal></para>
- <para>Specifying which load balancing policy to use differs whether you are using JMS or the
- core API. If you don't specify a policy then the default will be used which is <literal
+ <para>你可以实现自己的策略。只需要实现接口<literal
+ >org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy</literal>即可。</para>
+ <para>根据你使用的是JMS还是核心接口,指定负载均衡的方法是有所不同的。如果你不指定策略,默认的策略是<literal
>org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy</literal>.</para>
- <para>If you're using JMS, and you're using JNDI on the server to put your JMS connection
- factories into JNDI, then you can specify the load balancing policy directly in the
- <literal>hornetq-jms.xml</literal> configuration file on the server as follows:
+ <para>如果使用的是JMS,并且JMS连接工厂注册到JNDI,则你需要在<literal>hornetq-jms.xml</literal>文件中定义策略,如:
<programlisting>
<connection-factory name="ConnectionFactory">
<discovery-group-ref discovery-group-name="my-discovery-group"/>
@@ -477,49 +349,39 @@
org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
</connection-load-balancing-policy-class-name>
</connection-factory>
- </programlisting>The
- above example would deploy a JMS connection factory that uses the random connection load
- balancing policy. </para>
- <para>If you're using JMS but you're instantiating your connection factory directly on the
- client side then you can set the load balancing policy using the setter on the <literal
- >HornetQConnectionFactory</literal> before using it:
+ </programlisting>上面的配置将部署一个连接工厂,它的连接负载均衡策略是随机策略。</para>
+ <para>如果使用JMS,但是你在客户端是直接创建连接工厂的实例,那么你需要用相应的方法在<literal
+ >HornetQConnectionFactory</literal>上直接设置:
<programlisting>
ConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactory(...);
jmsConnectionFactory.setLoadBalancingPolicyClassName("com.acme.MyLoadBalancingPolicy");
</programlisting></para>
- <para>If you're using the core API, you can set the load balancing policy directly on the
- <literal>ClientSessionFactory</literal> instance you are using:
+ <para>如果你使用核心接口的话,你要直接在<literal>ClientSessionFactory</literal>上设置策略:
<programlisting>
ClientSessionFactory factory = HornetQClient.createClientSessionFactory(...);
factory.setLoadBalancingPolicyClassName("com.acme.MyLoadBalancingPolicy");
</programlisting></para>
- <para>The set of servers over which the factory load balances can be determined in one of
- two ways:</para>
+ <para>连接工厂进行负载均衡的服务器组可以有两种方法来确定:</para>
<itemizedlist>
<listitem>
- <para>Specifying servers explicitly</para>
+ <para>显式指定服务器</para>
</listitem>
<listitem>
- <para>Using discovery.</para>
+ <para>使用发现组功能</para>
</listitem>
</itemizedlist>
</section>
<section>
- <title>Specifying Members of a Cluster Explicitly</title>
- <para>Sometimes UDP is not enabled on a network so it's not possible to use UDP server
- discovery for clients to discover the list of servers in the cluster, or for servers to
- discover what other servers are in the cluster.</para>
- <para>In this case, the list of servers in the cluster can be specified explicitly on each
- node and on the client side. Let's look at how we do this:</para>
+ <title>显式指定集群服务器</title>
+ <para>有的网络并不开放UDP,所以就不能使用服务器发现功能来获取服务器列表。</para>
+ <para>在这种情况下,可以显式地在每个节点或客户端指定服务器的列表。下面介绍如何做:</para>
<section>
- <title>Specify List of Servers on the Client Side</title>
- <para>This differs depending on whether you're using JMS or the Core API</para>
+ <title>在客户端指定服务器列表</title>
+ <para>根据使用的是JMS还是核心接口,所用的方法也不同。</para>
<section>
- <title>Specifying List of Servers using JMS</title>
- <para>If you're using JMS, and you're using the JMS Service to load your JMS
- connection factory instances directly into JNDI on the server, then you can
- specify the list of servers in the server side configuration file <literal
- >hornetq-jms.xml</literal>. Let's take a look at an example:</para>
+ <title>使用JMS时指定服务器列表</title>
+ <para>如果使用JMS,并且JMS连接工厂是注册到JNDI的话,你需要在服务器端的配置文件
+ <literal>hornetq-jms.xml</literal>中来指定,如下面的例子:</para>
<programlisting><connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="my-connector1"
@@ -533,23 +395,17 @@
<entry name="ConnectionFactory"/>
</entries>
</connection-factory></programlisting>
- <para>The <literal>connection-factory</literal> element can contain zero or more
- <literal>connector-ref</literal> elements, each one of which specifies a
- <literal>connector-name</literal> attribute and an optional <literal
- >backup-connector-name</literal> attribute. The <literal
- >connector-name</literal> attribute references a connector defined in
- <literal>hornetq-configuration.xml</literal> which will be used as a live
- connector. The <literal>backup-connector-name</literal> is optional, and if
- specified it also references a connector defined in <literal
- >hornetq-configuration.xml</literal>. For more information on connectors please
- see <xref linkend="configuring-transports" />.</para>
- <para>The connection factory thus maintains a list of [connector, backup connector]
- pairs, these pairs are then used by the client connection load balancing policy
- on the client side when creating connections to the cluster.</para>
- <para>If you're using JMS but you're not using JNDI then you can also specify the
- list of [connector, backup connector] pairs directly when instantiating the
- <literal>HornetQConnectionFactory</literal>, here's an
- example:<programlisting>List<Pair<TransportConfiguration, TransportConfiguration>> serverList =
+ <para>其中的<literal>connection-factory</literal>内可以包含零或多个
+ <literal>connector-ref</literal>。每个<literal>connector-ref</literal>
+ 都拥有<literal>connector-name</literal>属性和一个可选的<literal
+ >backup-connector-name</literal>属性。<literal
+ >connector-name</literal> 属性指向的是一个在<literal>hornetq-configuration.xml</literal>
+ 文件中定义的连接器。而<literal>backup-connector-name</literal>属性也是指向在
+ <literal>hornetq-configuration.xml</literal>文件中定义的一个连接器。
+ 有关连接器更多的信息参见<xref linkend="configuring-transports" />。</para>
+ <para>连接工厂这样就保存有一组[连接器, 备份连接器]对,用于客户端在创建连接时的负载均衡。</para>
+ <para>如果你使用JMS,但不使用JNDI,你可以直接创建<literal>HornetQConnectionFactory</literal>
+ 的实例,然后用相应的方法来设定连接器对列表,如下例:<programlisting>List<Pair<TransportConfiguration, TransportConfiguration>> serverList =
new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
serverList.add(new Pair<TransportConfiguration,
@@ -564,20 +420,15 @@
Connection jmsConnection1 = jmsConnectionFactory.createConnection();
Connection jmsConnection2 = jmsConnectionFactory.createConnection();</programlisting></para>
- <para>In the above snippet we create a list of pairs of <literal
- >TransportConfiguration</literal> objects. Each <literal
- >TransportConfiguration</literal> object contains knowledge of how to make a
- connection to a specific server.</para>
- <para>A <literal>HornetQConnectionFactory</literal> instance is then created passing
- the list of servers in the constructor. Any connections subsequently created by
- this factory will create connections according to the client connection load
- balancing policy applied to that list of servers.</para>
+ <para>上面的代码中我们创建了一组<literal>TransportConfiguration</literal>对象。每个
+ <literal>TransportConfiguration</literal>对象包括了如何连接某个特定服务器的信息。</para>
+ <para>然后,使用这个服务器列表创建了一个<literal>HornetQConnectionFactory</literal>实例。
+ 这样通过这个工厂创建的连接就可以使用这个列表,由所用的客户连接负载均衡策略来进行连接的负载均衡。</para>
</section>
<section>
- <title>Specifying List of Servers using the Core API</title>
- <para>If you're using the core API you can also specify the list of servers directly
- when creating the <literal>ClientSessionFactory</literal> instance. Here's an
- example:</para>
+ <title>使用核心接口指定服务器列表</title>
+ <para>如果使用核心接口,你可以直接在<literal>ClientSessionFactory</literal>实例上设置服务器列表。
+ 如下例:</para>
<programlisting>List<Pair<TransportConfiguration, TransportConfiguration>> serverList =
new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
@@ -593,24 +444,18 @@
ClientSession sesison1 = factory.createClientSession(...);
ClientSession session2 = factory.createClientSession(...);</programlisting>
- <para>In the above snippet we create a list of pairs of <literal
- >TransportConfiguration</literal> objects. Each <literal
- >TransportConfiguration</literal> object contains knowledge of how to make a
- connection to a specific server. For more information on this, please see
- <xref linkend="configuring-transports" />.</para>
- <para>A <literal>ClientSessionFactoryImpl</literal> instance is then created passing
- the list of servers in the constructor. Any sessions subsequently created by
- this factory will create sessions according to the client connection load
- balancing policy applied to that list of servers.</para>
+ <para>在上面的代码中我们创建了一组<literal>ClientSessionFactoryImpl</literal>对象。每个
+ <literal>TransportConfiguration</literal>对象包括了如何连接某个特定服务器的信息。
+ 有关信息请参见<xref linkend="configuring-transports" />。</para>
+ <para>然后,使用这个服务器列表创建了一个<literal>HornetQConnectionFactory</literal>实例。
+ 这样通过这个工厂创建的会话就可以使用这个列表,由所用的客户连接负载均衡策略来进行连接的负载均衡。</para>
</section>
</section>
<section id="clusters.static.servers">
- <title>Specifying List of Servers to form a Cluster</title>
- <para>Let's take a look at an example where each cluster connection is defined for a
- symmetric cluster, but we're not using discovery for each node to discover its
- neighbours, instead we'll configure each cluster connection to have explicit
- knowledge of all the other nodes in the cluster.</para>
- <para>Here's an example cluster connection definition showing that:</para>
+ <title>指定服务器列表以组成集群</title>
+ <para>下面我们考虑一个对称集群的例子,我们配置了每个集群连接,但是不使用发现功能来获得服务器信息。我们
+ 采用配置的方法来显式指定集群的所有成员。</para>
+ <para>下面就是一个集群连接的配置:</para>
<programlisting><cluster-connections>
<cluster-connection name="my-explicit-cluster">
<address>jms</address>
@@ -622,115 +467,80 @@
backup-connector-name="my-backup-connector3"/>
</cluster-connection>
</cluster-connections></programlisting>
- <para>The <literal>cluster-connection</literal> element can contain zero or more
- <literal>connector-ref</literal> elements, each one of which specifies a
- <literal>connector-name</literal> attribute and an optional <literal
- >backup-connector-name</literal> attribute. The <literal
- >connector-name</literal> attribute references a connector defined in <literal
- >hornetq-configuration.xml</literal> which will be used as a live connector. The
- <literal>backup-connector-name</literal> is optional, and if specified it also
- references a connector defined in <literal>hornetq-configuration.xml</literal>. For more
- information on connectors please see <xref linkend="configuring-transports" />.</para>
+ <para><literal>cluster-connection</literal>中可以包括零或多个<literal>connector-ref</literal>,
+ 每个<literal>connector-ref</literal>都有一个<literal>connector-name</literal>属性和
+ 一个可选的<literal>backup-connector-name</literal>属性。<literal
+ >connector-name</literal>属性指向一个在<literal
+ >hornetq-configuration.xml</literal>文件中定义的一个连接器,它是主连接器。可选的
+ <literal>backup-connector-name</literal>指向的也是在
+ <literal>hornetq-configuration.xml</literal>文件中定义的一个连接器。
+ 有关连接器的详细信息参见<xref linkend="configuring-transports" />。</para>
<note>
- <para>Due to a limitation in HornetQ 2.0.0, failover is not supported for clusters
- defined using a static set of nodes. To support failover over cluster nodes, they
- must be configured to use a discovery group.</para>
+ <para>由于HornetQ 2.0.0的限制,使用静态节点列表的集群不支持失效备援(failover)。要想支持失效备援,
+ 就必须使用发现组。</para>
</note>
</section>
</section>
<section id="clusters.message-redistribution">
- <title>Message Redistribution</title>
- <para>Another important part of clustering is message redistribution. Earlier we learned how
- server side message load balancing round robins messages across the cluster. If <literal
- >forward-when-no-consumers</literal> is false, then messages won't be forwarded to
- nodes which don't have matching consumers, this is great and ensures that messages don't
- arrive on a queue which has no consumers to consume them, however there is a situation
- it doesn't solve: What happens if the consumers on a queue close after the messages have
- been sent to the node? If there are no consumers on the queue the message won't get
- consumed and we have a <emphasis>starvation</emphasis> situation.</para>
- <para>This is where message redistribution comes in. With message redistribution HornetQ
- can be configured to automatically <emphasis>redistribute</emphasis> messages
- from queues which have no consumers back to other nodes in the cluster which do have
- matching consumers.</para>
- <para>Message redistribution can be configured to kick in immediately after the last
- consumer on a queue is closed, or to wait a configurable delay after the last consumer
- on a queue is closed before redistributing. By default message redistribution is
- disabled.</para>
- <para>Message redistribution can be configured on a per address basis, by specifying the
- redistribution delay in the address settings, for more information on configuring
- address settings, please see <xref linkend="queue-attributes" />.</para>
- <para>Here's an address settings snippet from <literal>hornetq-configuration.xml</literal>
- showing how message redistribution is enabled for a set of queues:</para>
+ <title>消息再分配</title>
+ <para>集群的另一个重要功能是消息的再分配。前面我们知道在服务器端可以对消息大集群节点间进行轮流方式的负载均衡。如果
+ <literal>forward-when-no-consumers</literal>参数为false,消息将不会转发到那些没有相应接收者的节点中。
+ 这样可以有效避免了消息被送到一个不可能被接收的节点上。但仍然有一个问题无法解决:就是如果在消息发到一个节点后,
+ 它的接收者被关闭,那么这些消息仍然不能被接收了,造成了一种消息<emphasis>匮乏</emphasis>情形。
+ 这种情况下如何处理?</para>
+ <para>这里就需要消息再分配功能。通过配置,HornetQ可以将没有接收者的队列中的消息<emphasis>再次分配</emphasis>
+ 到有接收者的节点上去。</para>
+ <para>通过配置,消息可以在队列最后一个接收者关闭时立即进行,也可以配置成等待一段时间再进行。默认消息再分配功能是
+ 关闭的。</para>
+ <para>消息再分配功能可以基于地址进行配置,即在地址设置中指定再分配的延时。关于地址设置的更多信息,请参见
+ <xref linkend="queue-attributes" />。</para>
+ <para>下面是从<literal>hornetq-configuration.xml</literal>文件中提取的消息再分配的配置:</para>
<programlisting><address-settings>
<address-setting match="jms.#">
<redistribution-delay>0</redistribution-delay>
</address-setting>
</address-settings></programlisting>
- <para>The above <literal>address-settings</literal> block would set a <literal
- >redistribution-delay</literal> of <literal>0</literal> for any queue which is bound
- to an address that starts with "jms.". All JMS queues and topic subscriptions are bound
- to addresses that start with "jms.", so the above would enable instant (no delay)
- redistribution for all JMS queues and topic subscriptions.</para>
- <para>The attribute <literal>match</literal> can be an exact match or it can be a string
- that conforms to the HornetQ wildcard syntax (described in <xref linkend="wildcard-syntax"
- />).</para>
- <para>The element <literal>redistribution-delay</literal> defines the delay in milliseconds
- after the last consumer is closed on a queue before redistributing messages from that
- queue to other nodes of the cluster which do have matching consumers. A delay of zero
- means the messages will be immediately redistributed. A value of <literal>-1</literal>
- signifies that messages will never be redistributed. The default value is <literal
- >-1</literal>.</para>
- <para>It often makes sense to introduce a delay before redistributing as it's a common case
- that a consumer closes but another one quickly is created on the same queue, in such a
- case you probably don't want to redistribute immediately since the new consumer will
- arrive shortly.</para>
+ <para>上面<literal>address-settings</literal>中设置的<literal
+ >redistribution-delay</literal>值为<literal>0</literal>。它适用于所有以“jms“开头的
+ 地址。由于所有JMS队列与话题订阅都绑定到以”jms“为开头的地址,所以上述配置的立即方式(没有延迟)消息
+ 再分配适用于所有的JMS队列和话题订阅。</para>
+ <para><literal>match</literal>属性可以是精确匹配,也可以使用通配符。通配符要符合HornetQ的通配符
+ 语法(在<xref linkend="wildcard-syntax"
+ />中描述)。</para>
+ <para><literal>redistribution-delay</literal>定义了队列最后一个接收者关闭后在进行消息再分配前所等待的
+ 时间,单位毫秒。如果其值是0,表示立即进行消息再分配。<literal>-1</literal>表示不会进行消息再分配。
+ 默认值是<literal>-1</literal>。</para>
+ <para>通常为消息分配定义一个延迟是有实际意义的。很多时候当一个接收者被关闭时,很快就会有一个新的接收者被创建。
+ 在这种情况下加一延迟可以使消息继续在本地进行接收,而不会将消息转发到别处。</para>
</section>
<section>
- <title>Cluster topologies</title>
- <para>HornetQ clusters can be connected together in many different topologies, let's
- consider the two most common ones here</para>
+ <title>集群拓扑结构</title>
+ <para>HornetQ集群可以有多种拓扑结构。我们来看两个最常见的结构。</para>
<section id="symmetric-cluster">
- <title>Symmetric cluster</title>
- <para>A symmetric cluster is probably the most common cluster topology, and you'll be
- familiar with if you've had experience of JBoss Application Server
- clustering.</para>
- <para>With a symmetric cluster every node in the cluster is connected to every other
- node in the cluster. In other words every node in the cluster is no more than one
- hop away from every other node.</para>
- <para>To form a symmetric cluster every node in the cluster defines a cluster connection
- with the attribute <literal>max-hops</literal> set to <literal>1</literal>.
- Typically the cluster connection will use server discovery in order to know what
- other servers in the cluster it should connect to, although it is possible to
- explicitly define each target server too in the cluster connection if, for example,
- UDP is not available on your network.</para>
- <para>With a symmetric cluster each node knows about all the queues that exist on all
- the other nodes and what consumers they have. With this knowledge it can determine
- how to load balance and redistribute messages around the nodes.</para>
+ <title>对称式集群</title>
+ <para>对称式集群可能是最常见的集群方式了。如果你接触过JBoss应用服务器的集群,你就对这种方式很熟悉。</para>
+ <para>在一个对称集群中,每一个节点都与集群中其它任一节点相连。换句话说,集群中任意两个节点的连接都
+ 只有一跳(hop)。</para>
+ <para>要组成一个对称式的集群,每个节点在定义集群连接时要将属性<literal>max-hops</literal>
+ 设为<literal>1</literal>。通常集群连接将使用服务器发现的功能来获得集群中其它服务器的连接
+ 信息。当然在UDP不可用的时候,也可以通过显式方式为集群连接指定服务器。</para>
+ <para>在对称集群中,每个服务器都知道集群中其它服务器中的所有队列信息,以及它们的接收者信息。利用这些
+ 信息它可以决定如何进行消息的负载均衡及消息再分配。</para>
</section>
<section>
- <title>Chain cluster</title>
- <para>With a chain cluster, each node in the cluster is not connected to every node in
- the cluster directly, instead the nodes form a chain with a node on each end of the
- chain and all other nodes just connecting to the previous and next nodes in the
- chain.</para>
- <para>An example of this would be a three node chain consisting of nodes A, B and C.
- Node A is hosted in one network and has many producer clients connected to it
- sending order messages. Due to corporate policy, the order consumer clients need to
- be hosted in a different network, and that network is only accessible via a third
- network. In this setup node B acts as a mediator with no producers or consumers on
- it. Any messages arriving on node A will be forwarded to node B, which will in turn
- forward them to node C where they can get consumed. Node A does not need to directly
- connect to C, but all the nodes can still act as a part of the cluster.</para>
- <para>To set up a cluster in this way, node A would define a cluster connection that
- connects to node B, and node B would define a cluster connection that connects to
- node C. In this case we only want cluster connections in one direction since we're
- only moving messages from node A->B->C and never from C->B->A.</para>
- <para>For this topology we would set <literal>max-hops</literal> to <literal
- >2</literal>. With a value of <literal>2</literal> the knowledge of what queues and
- consumers that exist on node C would be propagated from node C to node B to node A.
- Node A would then know to distribute messages to node B when they arrive, even
- though node B has no consumers itself, it would know that a further hop away is node
- C which does have consumers.</para>
+ <title>链式集群</title>
+ <para>在链式集群中,并不是每个节点都与其它任何节点直接相连,而是由两个节点组成头和尾,其余节点在中间连接
+ 成为一个链的结构。</para>
+ <para>比如有三个节点A、B和C。节点A在一个网络中,它有许多消息的发送者向它发送订单消息。由于公司的政策,订单
+ 的接收者需要在另一个网络中接收消息,并且这个网络需要经过其它第三个网络才可以访问。这种情况下我们将节点
+ B部署到第三个网络中,作为节点A与节点C的中间节点将两个节点连接起来。当消息到达节点A时,被转发到节点B,
+ 然后又被转发到节点C上,这样消息就被C上的接收者所接收。节点A不需要直接与节点C连接,但是所有三个节点仍然
+ 组成了一个集群。</para>
+ <para>要想组成一个这样的集群,节点A的集群连接要指向节点B,节点B的集群连接要指向C。本例我们只想组成一个单向
+ 的链式集群,即我们只将消息按节点A->B->C的方向流动,而不要向 C->B->A方向流动。</para>
+ <para>对于这种集群拓扑,我们需要将<literal>max-hops</literal>设为<literal
+ >2</literal>. 这个值可以使节点C上队列的信息传送到节点B,再传送到节点A。因此节点A就知道消息到达时即将
+ 其转发给节点B。尽管节点B可能没有接收者,可它知道再经过一跳就可以将消息转到节点C,那里就有接收者了。</para>
</section>
</section>
</chapter>
14 years, 10 months
JBoss hornetq SVN: r9184 - in trunk: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-29 10:06:37 -0400 (Thu, 29 Apr 2010)
New Revision: 9184
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-377
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-28 23:13:26 UTC (rev 9183)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-29 14:06:37 UTC (rev 9184)
@@ -52,6 +52,7 @@
import org.hornetq.jms.bridge.JMSBridge;
import org.hornetq.jms.bridge.JMSBridgeControl;
import org.hornetq.jms.bridge.QualityOfServiceMode;
+import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQSession;
/**
@@ -110,7 +111,7 @@
private volatile boolean addMessageIDInHeader;
private boolean started;
-
+
private boolean stopping = false;
private final LinkedList<Message> messages;
@@ -142,7 +143,7 @@
private BatchTimeChecker timeChecker;
private ExecutorService executor;
-
+
private long batchExpiryTime;
private boolean paused;
@@ -310,7 +311,7 @@
public synchronized void start() throws Exception
{
stopping = false;
-
+
if (started)
{
JMSBridgeImpl.log.warn("Attempt to start, but is already started");
@@ -327,7 +328,7 @@
{
executor = createExecutor();
}
-
+
checkParams();
TransactionManager tm = getTm();
@@ -395,7 +396,7 @@
public synchronized void stop() throws Exception
{
stopping = true;
-
+
if (JMSBridgeImpl.trace)
{
JMSBridgeImpl.log.trace("Stopping " + this);
@@ -409,8 +410,8 @@
}
boolean ok = executor.awaitTermination(60, TimeUnit.SECONDS);
-
- if(!ok)
+
+ if (!ok)
{
throw new Exception("fail to stop JMS Bridge");
}
@@ -1235,7 +1236,7 @@
{
log.trace("Failed to connect bridge", e);
}
-
+
cleanup();
return false;
@@ -1665,7 +1666,17 @@
String propName = (String)entry.getKey();
- msg.setObjectProperty(propName, entry.getValue());
+ Object val = entry.getValue();
+
+ if (val instanceof byte[] == false)
+ {
+ //Can't set byte[] array props through the JMS API - if we're bridging a HornetQ message it might have such props
+ msg.setObjectProperty(propName, entry.getValue());
+ }
+ else if (msg instanceof HornetQMessage)
+ {
+ ((HornetQMessage)msg).getCoreMessage().putBytesProperty(propName, (byte[])val);
+ }
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-04-28 23:13:26 UTC (rev 9183)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-04-29 14:06:37 UTC (rev 9184)
@@ -35,6 +35,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.bridge.impl.JMSBridgeImpl;
+import org.hornetq.jms.client.HornetQMessage;
/**
* A JMSBridgeTest
@@ -1005,10 +1006,20 @@
{
TextMessage tm = sessSource.createTextMessage("message" + i);
- // We add some headers to make sure they get passed through ok
+ // We add some properties to make sure they get passed through ok
tm.setStringProperty("wib", "uhuh");
tm.setBooleanProperty("cheese", true);
tm.setIntProperty("Sausages", 23);
+ tm.setByteProperty("bacon", (byte)12);
+ tm.setDoubleProperty("toast", 17261762.12121d);
+ tm.setFloatProperty("orange", 1212.1212f);
+ tm.setLongProperty("blurg", 817217827l);
+ tm.setShortProperty("stst", (short)26363);
+
+ //Set some JMS headers too
+
+ //And also set a core props
+ ((HornetQMessage)tm).getCoreMessage().putBytesProperty("bytes", new byte[] { 1, 2, 3});
// We add some JMSX ones too
@@ -1040,6 +1051,13 @@
Assert.assertEquals("uhuh", tm.getStringProperty("wib"));
Assert.assertTrue(tm.getBooleanProperty("cheese"));
Assert.assertEquals(23, tm.getIntProperty("Sausages"));
+ assertEquals((byte)12, tm.getByteProperty("bacon"));
+ assertEquals(17261762.12121d, tm.getDoubleProperty("toast"));
+ assertEquals(1212.1212f, tm.getFloatProperty("orange"));
+ assertEquals(817217827l, tm.getLongProperty("blurg"));
+ assertEquals((short)26363, tm.getShortProperty("stst"));
+
+ assertEqualsByteArrays(new byte[] { 1,2, 3}, ((HornetQMessage)tm).getCoreMessage().getBytesProperty("bytes"));
Assert.assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
@@ -1085,6 +1103,13 @@
Assert.assertEquals("uhuh", tm.getStringProperty("wib"));
Assert.assertTrue(tm.getBooleanProperty("cheese"));
Assert.assertEquals(23, tm.getIntProperty("Sausages"));
+ assertEquals((byte)12, tm.getByteProperty("bacon"));
+ assertEquals(17261762.12121d, tm.getDoubleProperty("toast"));
+ assertEquals(1212.1212f, tm.getFloatProperty("orange"));
+ assertEquals(817217827l, tm.getLongProperty("blurg"));
+ assertEquals((short)26363, tm.getShortProperty("stst"));
+
+ assertEqualsByteArrays(new byte[] { 1,2, 3}, ((HornetQMessage)tm).getCoreMessage().getBytesProperty("bytes"));
Assert.assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
@@ -1994,6 +2019,8 @@
{
return newTransactionManager();
}
+
+
// Inner classes -------------------------------------------------------------------
14 years, 10 months
JBoss hornetq SVN: r9183 - trunk/src/main/org/hornetq/core/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-04-28 19:13:26 -0400 (Wed, 28 Apr 2010)
New Revision: 9183
Modified:
trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
Log:
just javadoc
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2010-04-28 22:11:06 UTC (rev 9182)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2010-04-28 23:13:26 UTC (rev 9183)
@@ -32,6 +32,12 @@
boolean isSupportsCallbacks();
+ /**
+ * Note: You need to release the buffer if is used for reading operations.
+ * You don't need to do it if using writing operations (AIO Buffer Lister will take of writing operations)
+ * @param size
+ * @return
+ */
ByteBuffer newBuffer(int size);
void releaseBuffer(ByteBuffer buffer);
14 years, 10 months
JBoss hornetq SVN: r9182 - trunk/src/main/org/hornetq/jms/management/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-04-28 18:11:06 -0400 (Wed, 28 Apr 2010)
New Revision: 9182
Modified:
trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
Log:
Avoiding possible NPE
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-04-28 15:57:32 UTC (rev 9181)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-04-28 22:11:06 UTC (rev 9182)
@@ -219,7 +219,10 @@
for (String queue : queues)
{
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue);
- count += coreQueueControl.removeMessages(filter);
+ if (coreQueueControl != null)
+ {
+ count += coreQueueControl.removeMessages(filter);
+ }
}
return count;
14 years, 10 months