JBoss hornetq SVN: r9793 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-15 16:18:59 -0400 (Fri, 15 Oct 2010)
New Revision: 9793
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Transactions on Cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15 20:07:39 UTC (rev 9792)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15 20:18:59 UTC (rev 9793)
@@ -207,6 +207,14 @@
{
rolledback = true;
committed = false;
+
+ if (lateDeliveries != null)
+ {
+ for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+ {
+ pos.a.positionIgnored(pos.b);
+ }
+ }
}
public String toString()
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15 20:07:39 UTC (rev 9792)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15 20:18:59 UTC (rev 9793)
@@ -381,9 +381,9 @@
PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor = pageStore.getCursorProvier().getCursor(queue.getID());
- PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
-
System.out.println("Cursor: " + cursor);
StorageManager storage = this.server.getStorageManager();
@@ -392,14 +392,20 @@
PageTransactionInfoImpl pgtxForgotten = new PageTransactionInfoImpl(storage.generateUniqueID());
PageTransactionInfoImpl pgtxCommit = new PageTransactionInfoImpl(storage.generateUniqueID());
+ System.out.println("Forgetting tx " + pgtxForgotten.getTransactionID());
+
this.server.getPagingManager().addTransaction(pgtxRollback);
this.server.getPagingManager().addTransaction(pgtxCommit);
pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
+ pageStore.forceAnotherPage();
pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
+ pageStore.forceAnotherPage();
pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
+ pageStore.forceAnotherPage();
addMessages(300, NUM_MESSAGES, messageSize);
+ pageStore.forceAnotherPage();
// First consume what's already there without any tx as nothing was committed
@@ -414,6 +420,7 @@
assertNull(cursor.moveNext());
pgtxRollback.rollback();
+ this.server.getPagingManager().removeTransaction(pgtxRollback.getTransactionID());
pgtxCommit.commit();
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
13 years, 6 months
JBoss hornetq SVN: r9792 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-15 16:07:39 -0400 (Fri, 15 Oct 2010)
New Revision: 9792
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Transactions on Cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -14,6 +14,8 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
@@ -48,4 +50,13 @@
void increment();
int getNumberOfMessages();
+
+ /**
+ * This method will hold the position to be delivered later in case this transaction is pending.
+ * If the tx is not pending, it will return false, so the caller can deliver it right away
+ * @param cursor
+ * @param cursorPos
+ * @return true if the message will be delivered later, false if it should be delivered right away
+ */
+ boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -65,6 +65,8 @@
Page createPage(final int page) throws Exception;
+ PagingManager getPagingManager();
+
PageCursorProvider getCursorProvier();
void processReload() throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -46,6 +46,12 @@
void reloadACK(PagePosition position);
/**
+ * To be called when the cursor decided to ignore a position.
+ * @param position
+ */
+ void positionIgnored(PagePosition position);
+
+ /**
* To be used to avoid a redelivery of a prepared ACK after load
* @param position
*/
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -57,7 +57,7 @@
*/
PageCursor createCursor();
- Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, PagePosition pos) throws Exception;
+ Pair<PagePosition, PagedMessage> getNext(PageCursor cursor, PagePosition pos) throws Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -133,7 +133,7 @@
do
{
- message = cursorProvider.getAfter(this, lastPosition);
+ message = cursorProvider.getNext(this, lastPosition);
if (message != null)
{
@@ -217,10 +217,19 @@
*/
public void reloadPreparedACK(final Transaction tx, final PagePosition position)
{
- // internalAdd(position);
installTXCallback(tx, position);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.cursor.PageCursor#positionIgnored(org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public void positionIgnored(PagePosition position)
+ {
+ processACK(position);
+ }
+
+
public void processReload() throws Exception
{
if (recoveredACK != null)
@@ -247,7 +256,7 @@
// looking for holes on the ack list for redelivery
while (true)
{
- Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getAfter(this, tmpPos);
+ Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getNext(this, tmpPos);
positions = getPageInfo(tmpPos);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -17,15 +17,17 @@
import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;
@@ -44,9 +46,13 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageCursorProviderImpl.class);
+
// Attributes ----------------------------------------------------
private final PagingStore pagingStore;
+
+ private final PagingManager pagingManager;
private final StorageManager storageManager;
@@ -65,6 +71,7 @@
final ExecutorFactory executorFactory)
{
this.pagingStore = pagingStore;
+ this.pagingManager = pagingStore.getPagingManager();
this.storageManager = storageManager;
this.executorFactory = executorFactory;
}
@@ -106,23 +113,47 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, final PagePosition pos) throws Exception
+ public Pair<PagePosition, PagedMessage> getNext(final PageCursor cursor, PagePosition cursorPos) throws Exception
{
while(true)
{
- Pair<PagePosition, PagedMessage> retPos = internalAfter(pos);
+ Pair<PagePosition, PagedMessage> retPos = internalAfter(cursorPos);
-
-
- return retPos;
+ if (retPos == null)
+ {
+ return null;
+ }
+ else
+ if (retPos != null)
+ {
+ cursorPos = retPos.a;
+ if (retPos.b.getTransactionID() != 0)
+ {
+ PageTransactionInfo tx = pagingManager.getTransaction(retPos.b.getTransactionID());
+ if (tx == null)
+ {
+ log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() + ", ignoring message on position " + retPos.a);
+ cursor.positionIgnored(cursorPos);
+ }
+ else
+ {
+ if (!tx.deliverAfterCommit(cursor, cursorPos))
+ {
+ return retPos;
+ }
+ }
+ }
+ else
+ {
+ return retPos;
+ }
+ }
}
}
private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
{
- // TODO: consider page transactions here to avoid receiving an uncommitted message
- // TODO: consider the case where a full page is ignored because of a TX
PagePosition retPos = pos.nextMessage();
PageCache cache = getPageCache(pos);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -13,12 +13,17 @@
package org.hornetq.core.paging.impl;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -46,6 +51,8 @@
private volatile boolean rolledback = false;
private AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+ private List<Pair<PageCursor, PagePosition>> lateDeliveries;
// Static --------------------------------------------------------
@@ -132,6 +139,15 @@
public synchronized void commit()
{
committed = true;
+ if (lateDeliveries != null)
+ {
+ for (Pair<PageCursor, PagePosition> pos : lateDeliveries)
+ {
+ pos.a.redeliver(pos.b);
+ }
+ }
+ lateDeliveries.clear();
+ lateDeliveries = null;
}
public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
@@ -203,6 +219,32 @@
")";
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PageTransactionInfo#deliverAfterCommit(org.hornetq.core.paging.cursor.PageCursor, org.hornetq.core.paging.cursor.PagePosition)
+ */
+ public synchronized boolean deliverAfterCommit(PageCursor cursor, PagePosition cursorPos)
+ {
+ if (committed)
+ {
+ return false;
+ }
+ else
+ if (rolledback)
+ {
+ cursor.positionIgnored(cursorPos);
+ return true;
+ }
+ else
+ {
+ if (lateDeliveries == null)
+ {
+ lateDeliveries = new LinkedList<Pair<PageCursor, PagePosition>>();
+ }
+ lateDeliveries.add(new Pair<PageCursor, PagePosition>(cursor, cursorPos));
+ return true;
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -50,7 +50,7 @@
private ServerMessage message;
- private long transactionID = -1;
+ private long transactionID = 0;
public PagedMessageImpl(final ServerMessage message, final long transactionID)
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -377,6 +377,10 @@
cursorProvider.processReload();
}
+ public PagingManager getPagingManager()
+ {
+ return pagingManager;
+ }
// HornetQComponent implementation
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15 12:34:13 UTC (rev 9791)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15 20:07:39 UTC (rev 9792)
@@ -13,7 +13,9 @@
package org.hornetq.tests.integration.paging;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import junit.framework.Assert;
@@ -21,6 +23,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -29,7 +32,9 @@
import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -237,6 +242,8 @@
server.stop();
+ OperationContextImpl.clearContext();
+
server.start();
cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
@@ -289,6 +296,8 @@
server.stop();
+ OperationContextImpl.clearContext();
+
server.start();
cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
@@ -359,17 +368,102 @@
}
- public void testRollbackScenariosOnACK() throws Exception
+ public void testPrepareScenarios() throws Exception
{
+ PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
+
+ pageStore.startPaging();
+
+ final int NUM_MESSAGES = 100;
+ final int messageSize = 10 * 1024;
+
+
+ PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ System.out.println("cursorProvider = " + cursorProvider);
+
+ PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ System.out.println("Cursor: " + cursor);
+
+ StorageManager storage = this.server.getStorageManager();
+
+ PageTransactionInfoImpl pgtxRollback = new PageTransactionInfoImpl(storage.generateUniqueID());
+ PageTransactionInfoImpl pgtxForgotten = new PageTransactionInfoImpl(storage.generateUniqueID());
+ PageTransactionInfoImpl pgtxCommit = new PageTransactionInfoImpl(storage.generateUniqueID());
+
+ this.server.getPagingManager().addTransaction(pgtxRollback);
+ this.server.getPagingManager().addTransaction(pgtxCommit);
+
+ pgMessages(storage, pageStore, pgtxRollback, 0, NUM_MESSAGES, messageSize);
+ pgMessages(storage, pageStore, pgtxForgotten, 100, NUM_MESSAGES, messageSize);
+ pgMessages(storage, pageStore, pgtxCommit, 200, NUM_MESSAGES, messageSize);
+
+ addMessages(300, NUM_MESSAGES, messageSize);
+
+
+ // First consume what's already there without any tx as nothing was committed
+ for (int i = 300; i < 400; i++)
+ {
+ Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ assertNotNull("Null at position " + i, pos);
+ assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos.a);
+ }
+
+ assertNull(cursor.moveNext());
+
+ pgtxRollback.rollback();
+ pgtxCommit.commit();
+ // Second:after pgtxCommit was done
+ for (int i = 200; i < 300; i++)
+ {
+ Pair<PagePosition, PagedMessage> pos = cursor.moveNext();
+ assertNotNull(pos);
+ assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos.a);
+ }
+
+
}
+
+
+ /**
+ * @param storage
+ * @param pageStore
+ * @param pgParameter
+ * @param start
+ * @param NUM_MESSAGES
+ * @param messageSize
+ * @throws Exception
+ */
+ private void pgMessages(StorageManager storage,
+ PagingStoreImpl pageStore,
+ PageTransactionInfo pgParameter,
+ int start,
+ final int NUM_MESSAGES,
+ final int messageSize) throws Exception
+ {
+ List<ServerMessage> messages = new ArrayList<ServerMessage>();
+
+ for (int i = start ; i < start + NUM_MESSAGES; i++)
+ {
+ HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
+ ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
+ msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
+ msg.putIntProperty("key", i);
+ messages.add(msg);
+ }
+
+ pageStore.page(messages, pgParameter.getTransactionID());
+ }
- public void testReadRolledBackData() throws Exception
+ public void testRollbackScenariosOnACK() throws Exception
{
}
- public void testPrepareScenarios() throws Exception
+ public void testReadRolledBackData() throws Exception
{
}
@@ -398,19 +492,24 @@
{
}
+
+ private int addMessages(final int numMessages, final int messageSize) throws Exception
+ {
+ return addMessages(0, numMessages, messageSize);
+ }
/**
* @param numMessages
* @param pageStore
* @throws Exception
*/
- private int addMessages(final int numMessages, final int messageSize) throws Exception
+ private int addMessages(final int start, final int numMessages, final int messageSize) throws Exception
{
PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
pageStore.startPaging();
- for (int i = 0; i < numMessages; i++)
+ for (int i = start; i < start + numMessages; i++)
{
if (i % 100 == 0) System.out.println("Paged " + i);
HornetQBuffer buffer = RandomUtil.randomBuffer(messageSize, i + 1l);
@@ -464,7 +563,6 @@
protected void tearDown() throws Exception
{
- OperationContextImpl.clearContext();
server.stop();
super.tearDown();
}
13 years, 6 months
JBoss hornetq SVN: r9791 - in branches/hornetq-416/src/main/org/hornetq: jms/management/impl and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-15 08:34:13 -0400 (Fri, 15 Oct 2010)
New Revision: 9791
Removed:
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java
Modified:
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
Log:
added principal
Deleted: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java 2010-10-15 11:38:17 UTC (rev 9790)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java 2010-10-15 12:34:13 UTC (rev 9791)
@@ -1,56 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * A ConnectionSetClientIDMessage
- *
- *
- */
-public class ConnectionSetClientIDMessage extends PacketImpl
-{
- private String clientID;
-
- public ConnectionSetClientIDMessage()
- {
- super(PacketImpl.CONNECTION_SET_CLIENTID);
- }
-
- public ConnectionSetClientIDMessage(String cID)
- {
- super(PacketImpl.CONNECTION_SET_CLIENTID);
- clientID = cID;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeString(clientID);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- clientID = buffer.readString();
- }
-
- public String getClientID()
- {
- return clientID;
- }
-
-}
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-15 11:38:17 UTC (rev 9790)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-15 12:34:13 UTC (rev 9791)
@@ -739,20 +739,25 @@
Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
Set<ServerSession> sessions = server.getHornetQServer().getSessions();
- Map<Object, String> clientIDs = new HashMap<Object, String>();
+
+ Map<Object, ServerSession> initialSessions = new HashMap<Object, ServerSession>();
+
for (ServerSession session : sessions)
{
- if (session.getMetaData("jms-initial-session") != null) {
- clientIDs.put(session.getConnectionID(), session.getMetaData("jms-client-id"));
+ if (session.getMetaData("jms-initial-session") != null)
+ {
+ initialSessions.put(session.getConnectionID(), session);
}
}
+
for (RemotingConnection connection : connections)
{
JSONObject obj = new JSONObject();
obj.put("connectionID", connection.getID().toString());
obj.put("clientAddress", connection.getRemoteAddress());
obj.put("creationTime", connection.getCreationTime());
- obj.put("clientID", clientIDs.get(connection.getID()));
+ obj.put("clientID", initialSessions.get(connection.getID()).getMetaData("jms-client-id"));
+ obj.put("principal", initialSessions.get(connection.getID()).getUsername());
array.put(obj);
}
return array.toString();
13 years, 6 months
JBoss hornetq SVN: r9790 - in branches/hornetq-416/src/main/org/hornetq: core/protocol/core/impl/wireformat and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-15 07:38:17 -0400 (Fri, 15 Oct 2010)
New Revision: 9790
Modified:
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
Log:
clientID added
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-15 03:55:57 UTC (rev 9789)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-15 11:38:17 UTC (rev 9790)
@@ -463,6 +463,7 @@
}
case PacketImpl.SESS_ADD_METADATA:
{
+ response = new NullResponseMessage();
SessionAddMetaDataMessage message = (SessionAddMetaDataMessage)packet;
session.addMetaData(message.getKey(), message.getData());
break;
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java 2010-10-15 03:55:57 UTC (rev 9789)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java 2010-10-15 11:38:17 UTC (rev 9790)
@@ -19,7 +19,7 @@
/**
* A SessionAddMetaDataMessage
*
- * @author howard
+ * @author <a href="mailto:hgao@redhat.com>Howard Gao</a>
*
*
*/
@@ -54,6 +54,12 @@
data = buffer.readString();
}
+ @Override
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
public String getKey()
{
return key;
Modified: branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-15 03:55:57 UTC (rev 9789)
+++ branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-15 11:38:17 UTC (rev 9790)
@@ -573,7 +573,10 @@
initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0);
//mark it is a jms initial session
initialSession.addMetaData("jms-initial-session", "");
- initialSession.addMetaData("jms-username", username);
+ if (clientID != null)
+ {
+ initialSession.addMetaData("jms-client-id", clientID);
+ }
initialSession.addFailureListener(listener);
}
13 years, 6 months
JBoss hornetq SVN: r9789 - in branches/hornetq-416: src/main/org/hornetq/core/client/impl and 11 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-14 23:55:57 -0400 (Thu, 14 Oct 2010)
New Revision: 9789
Added:
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
Modified:
branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java
branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java
branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
Log:
refactor setclientid
Modified: branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -563,9 +563,8 @@
void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
/**
- * Sets ClientID of the associated JMS connection.
- * @param clientID the client ID
+ * Attach any metadata to the session.
+ * @throws HornetQException
*/
- void setClientID(String clientID);
-
+ void addMetaData(String key, String data) throws HornetQException;
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -39,13 +39,13 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ConnectionSetClientIDMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -1812,9 +1812,8 @@
}
- public void setClientID(String clientID)
+ public void addMetaData(String key, String data) throws HornetQException
{
- ConnectionSetClientIDMessage msg = new ConnectionSetClientIDMessage(clientID);
- channel.send(msg);
+ channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
}
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -556,8 +556,8 @@
session.setPacketSize(packetSize);
}
- public void setClientID(String clientID)
+ public void addMetaData(String key, String data) throws HornetQException
{
- session.setClientID(clientID);
+ session.addMetaData(key, data);
}
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -57,12 +57,12 @@
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ConnectionSetClientIDMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
@@ -461,10 +461,11 @@
session.requestProducerCredits(message.getAddress(), message.getCredits());
break;
}
- case PacketImpl.CONNECTION_SET_CLIENTID:
+ case PacketImpl.SESS_ADD_METADATA:
{
- ConnectionSetClientIDMessage message = (ConnectionSetClientIDMessage)packet;
- session.setConnectionClientID(message.getClientID());
+ SessionAddMetaDataMessage message = (SessionAddMetaDataMessage)packet;
+ session.addMetaData(message.getKey(), message.getData());
+ break;
}
}
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -78,12 +78,11 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.CONNECTION_SET_CLIENTID;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.wireformat.ConnectionSetClientIDMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -109,6 +108,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -488,9 +488,9 @@
packet = new SessionForceConsumerDelivery();
break;
}
- case CONNECTION_SET_CLIENTID:
+ case SESS_ADD_METADATA:
{
- packet = new ConnectionSetClientIDMessage();
+ packet = new SessionAddMetaDataMessage();
break;
}
default:
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -182,7 +182,7 @@
public static final byte REPLICATION_SYNC = 103;
- public static final byte CONNECTION_SET_CLIENTID = 104;
+ public static final byte SESS_ADD_METADATA = 104;
// Static --------------------------------------------------------
Added: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java (rev 0)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * A SessionAddMetaDataMessage
+ *
+ * @author howard
+ *
+ *
+ */
+public class SessionAddMetaDataMessage extends PacketImpl
+{
+ private String key;
+ private String data;
+
+ public SessionAddMetaDataMessage()
+ {
+ super(PacketImpl.SESS_ADD_METADATA);
+ }
+
+ public SessionAddMetaDataMessage(String k, String d)
+ {
+ this();
+ key = k;
+ data = d;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(key);
+ buffer.writeString(data);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ key = buffer.readString();
+ data = buffer.readString();
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public String getData()
+ {
+ return data;
+ }
+
+}
Modified: branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -45,6 +45,4 @@
void freeze();
RemotingConnection getServerSideReplicatingConnection();
-
- void setConnectionClientID(Object connID, String clientID);
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -571,9 +571,4 @@
}
}
- public void setConnectionClientID(Object connID, String clientID)
- {
- ConnectionEntry conn = connections.get(connID);
- conn.connection.setClientID(clientID);
- }
}
\ No newline at end of file
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -114,5 +114,7 @@
Set<ServerConsumer> getServerConsumers();
- void setConnectionClientID(String clientID);
+ void addMetaData(String key, String data);
+
+ String getMetaData(String key);
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -136,6 +136,8 @@
private volatile SimpleString defaultAddress;
private volatile int timeoutSeconds;
+
+ private Map<String, String> metaData;
// Constructors ---------------------------------------------------------------------------------
@@ -1183,9 +1185,23 @@
routingContext.clear();
}
- public void setConnectionClientID(String clientID)
+ public void addMetaData(String key, String data)
{
- this.server.getRemotingService().setConnectionClientID(this.getConnectionID(), clientID);
+ if (metaData == null)
+ {
+ metaData = new HashMap<String, String>();
+ }
+ metaData.put(key, data);
}
+ public String getMetaData(String key)
+ {
+ String data = null;
+ if (metaData != null)
+ {
+ data = metaData.get(key);
+ }
+ return data;
+ }
+
}
Modified: branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -181,7 +181,16 @@
}
this.clientID = clientID;
- initialSession.setClientID(clientID);
+ try
+ {
+ initialSession.addMetaData("jms-client-id", clientID);
+ }
+ catch (HornetQException e)
+ {
+ JMSException ex = new JMSException("Internal erro setting metadata jms-client-id");
+ ex.setLinkedException(e);
+ throw ex;
+ }
justCreated = false;
}
@@ -562,6 +571,9 @@
try
{
initialSession = sessionFactory.createSession(username, password, false, false, false, false, 0);
+ //mark it is a jms initial session
+ initialSession.addMetaData("jms-initial-session", "");
+ initialSession.addMetaData("jms-username", username);
initialSession.addFailureListener(listener);
}
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -14,6 +14,7 @@
package org.hornetq.jms.management.impl;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -736,13 +737,22 @@
JSONArray array = new JSONArray();
Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
+
+ Set<ServerSession> sessions = server.getHornetQServer().getSessions();
+ Map<Object, String> clientIDs = new HashMap<Object, String>();
+ for (ServerSession session : sessions)
+ {
+ if (session.getMetaData("jms-initial-session") != null) {
+ clientIDs.put(session.getConnectionID(), session.getMetaData("jms-client-id"));
+ }
+ }
for (RemotingConnection connection : connections)
{
JSONObject obj = new JSONObject();
obj.put("connectionID", connection.getID().toString());
obj.put("clientAddress", connection.getRemoteAddress());
obj.put("creationTime", connection.getCreationTime());
- obj.put("clientID", connection.getClientID());
+ obj.put("clientID", clientIDs.get(connection.getID()));
array.put(obj);
}
return array.toString();
Modified: branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -166,17 +166,4 @@
*/
void flush();
- /**
- * set the connections's clientID
- *
- * @param clientID the clientID
- */
- void setClientID(String clientID);
-
- /**
- * get the client id
- * @return the clientID
- */
- String getClientID();
-
}
Modified: branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2010-10-15 01:18:33 UTC (rev 9788)
+++ branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2010-10-15 03:55:57 UTC (rev 9789)
@@ -1399,6 +1399,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addMetaData(java.lang.String, java.lang.String)
+ */
+ public void addMetaData(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
13 years, 6 months
JBoss hornetq SVN: r9788 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-14 21:18:33 -0400 (Thu, 14 Oct 2010)
New Revision: 9788
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Treating PageTransactions over the cursors
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -24,8 +24,6 @@
*/
public interface PageTransactionInfo extends EncodingSupport
{
- boolean waitCompletion(int timeoutMilliSeconds) throws Exception;
-
boolean isCommit();
boolean isRollback();
@@ -45,11 +43,9 @@
void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
// To be used after the update was stored or reload
- void update(int update, StorageManager storageManager, PagingManager pagingManager);
+ void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
void increment();
int getNumberOfMessages();
-
- void markIncomplete();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -28,7 +28,9 @@
*/
public interface PagedMessage extends EncodingSupport
{
- ServerMessage getMessage(StorageManager storageManager);
+ ServerMessage getMessage();
+
+ void initMessage(StorageManager storageManager);
long getTransactionID();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/LivePageCache.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -13,7 +13,7 @@
package org.hornetq.core.paging.cursor;
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.paging.PagedMessage;
/**
* A LivePageCache
@@ -25,7 +25,5 @@
public interface LivePageCache extends PageCache
{
- void addLiveMessage(ServerMessage message);
-
- void close();
+ void addLiveMessage(PagedMessage message);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCache.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,7 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.core.paging.Page;
-import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.paging.PagedMessage;
/**
* A PageCache
@@ -29,7 +29,7 @@
int getNumberOfMessages();
- void setMessages(ServerMessage[] messages);
+ void setMessages(PagedMessage[] messages);
/**
* If this cache is still being updated
@@ -42,7 +42,7 @@
* @param messageNumber The order of the message on the page
* @return
*/
- ServerMessage getMessage(int messageNumber);
+ PagedMessage getMessage(int messageNumber);
/**
* When the cache is being created,
@@ -54,4 +54,7 @@
* You have to call this method within the same thread you called lock
*/
void unlock();
+
+ void close();
+
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
@@ -31,7 +32,7 @@
void stop();
- Pair<PagePosition, ServerMessage> moveNext() throws Exception;
+ Pair<PagePosition, PagedMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.cursor;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.ServerMessage;
@@ -56,9 +57,9 @@
*/
PageCursor createCursor();
- Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+ Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, PagePosition pos) throws Exception;
- ServerMessage getMessage(PagePosition pos) throws Exception;
+ PagedMessage getMessage(PagePosition pos) throws Exception;
void processReload() throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -17,6 +17,7 @@
import java.util.List;
import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.server.ServerMessage;
@@ -33,7 +34,7 @@
// Attributes ----------------------------------------------------
- private final List<ServerMessage> messages = new LinkedList<ServerMessage>();
+ private final List<PagedMessage> messages = new LinkedList<PagedMessage>();
private final Page page;
@@ -74,10 +75,10 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#setMessages(org.hornetq.core.server.ServerMessage[])
*/
- public synchronized void setMessages(ServerMessage[] messages)
+ public synchronized void setMessages(PagedMessage[] messages)
{
// This method shouldn't be called on liveCache, but we will provide the implementation for it anyway
- for (ServerMessage msg : messages)
+ for (PagedMessage msg : messages)
{
addLiveMessage(msg);
}
@@ -86,7 +87,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
*/
- public synchronized ServerMessage getMessage(int messageNumber)
+ public synchronized PagedMessage getMessage(int messageNumber)
{
if (messageNumber < messages.size())
{
@@ -125,7 +126,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.LivePageCache#addLiveMessage(org.hornetq.core.server.ServerMessage)
*/
- public synchronized void addLiveMessage(ServerMessage message)
+ public synchronized void addLiveMessage(PagedMessage message)
{
this.messages.add(message);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -17,8 +17,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.server.ServerMessage;
/**
* The caching associated to a single page.
@@ -29,28 +29,28 @@
*/
public class PageCacheImpl implements PageCache
{
-
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private ServerMessage[] messages;
-
+
+ private PagedMessage[] messages;
+
private final Page page;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
- public PageCacheImpl(Page page)
+
+ public PageCacheImpl(final Page page)
{
this.page = page;
}
// Public --------------------------------------------------------
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getPage()
*/
@@ -62,7 +62,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#getMessage(int)
*/
- public ServerMessage getMessage(int messageNumber)
+ public PagedMessage getMessage(final int messageNumber)
{
lock.readLock().lock();
try
@@ -81,22 +81,22 @@
lock.readLock().unlock();
}
}
-
+
public void lock()
{
lock.writeLock().lock();
}
-
+
public void unlock()
{
lock.writeLock().unlock();
}
-
- public void setMessages(ServerMessage[] messages)
+
+ public void setMessages(final PagedMessage[] messages)
{
this.messages = messages;
}
-
+
public int getNumberOfMessages()
{
lock.readLock().lock();
@@ -110,6 +110,10 @@
}
}
+ public void close()
+ {
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCache#isLive()
*/
@@ -117,13 +121,13 @@
{
return false;
}
-
+
+ @Override
public String toString()
{
return "PageCacheImpl::page=" + page.getPageId() + " numberOfMessages = " + messages.length;
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
@@ -109,14 +110,14 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized Pair<PagePosition, ServerMessage> moveNext() throws Exception
+ public synchronized Pair<PagePosition, PagedMessage> moveNext() throws Exception
{
PagePosition redeliveryPos = null;
// Redeliveries will take precedence
if ((redeliveryPos = redeliveries.poll()) != null)
{
- return new Pair<PagePosition, ServerMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
+ return new Pair<PagePosition, PagedMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
}
if (lastPosition == null)
@@ -128,17 +129,17 @@
boolean match = false;
- Pair<PagePosition, ServerMessage> message = null;
+ Pair<PagePosition, PagedMessage> message = null;
do
{
- message = cursorProvider.getAfter(lastPosition);
+ message = cursorProvider.getAfter(this, lastPosition);
if (message != null)
{
lastPosition = message.a;
- match = match(message.b);
+ match = match(message.b.getMessage());
if (!match)
{
@@ -246,7 +247,7 @@
// looking for holes on the ack list for redelivery
while (true)
{
- Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
+ Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getAfter(this, tmpPos);
positions = getPageInfo(tmpPos);
@@ -258,7 +259,7 @@
}
else
{
- if (match(msgCheck.b))
+ if (match(msgCheck.b.getMessage()))
{
redeliver(msgCheck.a);
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -106,8 +106,21 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
+ public Pair<PagePosition, PagedMessage> getAfter(PageCursor cursor, final PagePosition pos) throws Exception
{
+
+ while(true)
+ {
+ Pair<PagePosition, PagedMessage> retPos = internalAfter(pos);
+
+
+
+ return retPos;
+ }
+ }
+
+ private Pair<PagePosition, PagedMessage> internalAfter(final PagePosition pos)
+ {
// TODO: consider page transactions here to avoid receiving an uncommitted message
// TODO: consider the case where a full page is ignored because of a TX
PagePosition retPos = pos.nextMessage();
@@ -131,11 +144,11 @@
}
}
- ServerMessage serverMessage = cache.getMessage(retPos.getMessageNr());
+ PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
if (serverMessage != null)
{
- return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
+ return new Pair<PagePosition, PagedMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
}
else
{
@@ -143,7 +156,7 @@
}
}
- public ServerMessage getMessage(final PagePosition pos) throws Exception
+ public PagedMessage getMessage(final PagePosition pos) throws Exception
{
PageCache cache = getPageCache(pos);
@@ -257,16 +270,13 @@
List<PagedMessage> pgdMessages = page.read();
- ServerMessage srvMessages[] = new ServerMessage[pgdMessages.size()];
-
int i = 0;
for (PagedMessage pdgMessage : pgdMessages)
{
- ServerMessage message = pdgMessage.getMessage(storageManager);
- srvMessages[i++] = message;
+ pdgMessage.initMessage(storageManager);
}
- cache.setMessages(srvMessages);
+ cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
}
finally
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -181,7 +181,7 @@
if (pageCache != null)
{
- pageCache.addLiveMessage(message.getMessage(storageManager));
+ pageCache.addLiveMessage(message);
}
numberOfMessages.incrementAndGet();
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -13,8 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
@@ -43,12 +41,10 @@
private volatile long recordID = -1;
- private volatile CountDownLatch countDownCompleted;
+ private volatile boolean committed = false;
- private volatile boolean committed;
+ private volatile boolean rolledback = false;
- private volatile boolean rolledback;
-
private AtomicInteger numberOfMessages = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -59,7 +55,6 @@
{
this();
this.transactionID = transactionID;
- countDownCompleted = new CountDownLatch(1);
}
public PageTransactionInfoImpl()
@@ -83,7 +78,7 @@
return transactionID;
}
- public void update(final int update, final StorageManager storageManager, PagingManager pagingManager)
+ public void onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager)
{
int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
if (sizeAfterUpdate == 0 && storageManager != null)
@@ -120,7 +115,6 @@
{
transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt());
- countDownCompleted = null;
committed = true;
}
@@ -135,27 +129,11 @@
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
- public void commit()
+ public synchronized void commit()
{
committed = true;
- /**
- * this is to avoid a race condition where the transaction still being committed while another thread is depaging messages
- */
- countDownCompleted.countDown();
}
- public boolean waitCompletion(final int timeoutMilliseconds) throws InterruptedException
- {
- if (countDownCompleted == null)
- {
- return true;
- }
- else
- {
- return countDownCompleted.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
- }
- }
-
public void store(final StorageManager storageManager, PagingManager pagingManager, final Transaction tx) throws Exception
{
storageManager.storePageTransaction(tx.getID(), this);
@@ -194,7 +172,7 @@
public void afterCommit(Transaction tx)
{
- pgToUpdate.update(depages, storageManager, pagingManager);
+ pgToUpdate.onUpdate(depages, storageManager, pagingManager);
}
});
}
@@ -209,21 +187,12 @@
return rolledback;
}
- public void rollback()
+ public synchronized void rollback()
{
rolledback = true;
committed = false;
- countDownCompleted.countDown();
}
- public void markIncomplete()
- {
- committed = false;
- rolledback = false;
-
- countDownCompleted = new CountDownLatch(1);
- }
-
public String toString()
{
return "PageTransactionInfoImpl(transactionID=" + transactionID +
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -67,8 +67,13 @@
{
}
- public ServerMessage getMessage(final StorageManager storage)
+ public ServerMessage getMessage()
{
+ return message;
+ }
+
+ public void initMessage(StorageManager storage)
+ {
if (largeMessageLazyData != null)
{
message = storage.createLargeMessage();
@@ -76,7 +81,6 @@
message.decodeHeadersAndProperties(buffer);
largeMessageLazyData = null;
}
- return message;
}
public long getTransactionID()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -1019,7 +1019,7 @@
for (PagedMessage pagedMessage : pagedMessages)
{
- ServerMessage message = pagedMessage.getMessage(storageManager);
+ ServerMessage message = pagedMessage.getMessage();
if (message.isLargeMessage())
{
@@ -1060,7 +1060,7 @@
// This is to avoid a race condition where messages are depaged
// before the commit arrived
- while (running && !pageUserTransaction.waitCompletion(500))
+ while (running)
{
// This is just to give us a chance to interrupt the process..
// if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -948,7 +948,7 @@
PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
- pageTX.update(pageUpdate.recods, null, null);
+ pageTX.onUpdate(pageUpdate.recods, null, null);
}
else
{
@@ -1534,8 +1534,6 @@
pageTransactionInfo.decode(buff);
- pageTransactionInfo.markIncomplete();
-
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
pagingManager.addTransaction(pageTransactionInfo);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -548,7 +548,8 @@
private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception
{
PagedMessage pgdMessage = packet.getPagedMessage();
- ServerMessage msg = pgdMessage.getMessage(storage);
+ pgdMessage.initMessage(storage);
+ ServerMessage msg = pgdMessage.getMessage();
Page page = getPage(msg.getAddress(), packet.getPageNumber());
page.write(pgdMessage);
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -110,12 +111,12 @@
PageCursor cursor = cursorProvider.createCursor();
- Pair<PagePosition, ServerMessage> msg;
+ Pair<PagePosition, PagedMessage> msg;
int key = 0;
while ((msg = cursor.moveNext()) != null)
{
- assertEquals(key++, msg.b.getIntProperty("key").intValue());
+ assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
assertEquals(NUM_MESSAGES, key);
@@ -169,9 +170,9 @@
for (int i = 0 ; i < 1000 ; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < firstPageSize)
{
@@ -193,9 +194,9 @@
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
assertNotNull(msg);
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
@@ -226,8 +227,8 @@
System.out.println("Cursor: " + cursor);
for (int i = 0 ; i < 100 ; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ack(msg.a);
@@ -242,16 +243,16 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ack(msg.a);
}
@@ -276,8 +277,8 @@
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
for (int i = 0 ; i < 100 ; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
cursor.ackTx(tx, msg.a);
@@ -296,15 +297,15 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx,msg.a);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- assertEquals(i, msg.b.getIntProperty("key").intValue());
+ Pair<PagePosition, PagedMessage> msg = cursor.moveNext();
+ assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
cursor.ackTx(tx,msg.a);
}
@@ -344,13 +345,13 @@
Assert.assertTrue(pageStore.page(msg));
- Pair<PagePosition, ServerMessage> readMessage = cursor.moveNext();
+ Pair<PagePosition, PagedMessage> readMessage = cursor.moveNext();
assertNotNull(readMessage);
cursor.ack(readMessage.a);
- assertEquals(i, readMessage.b.getIntProperty("key").intValue());
+ assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
assertNull(cursor.moveNext());
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -106,10 +106,10 @@
for (int i = 0; i < msgs.size(); i++)
{
- Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
+ Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddress());
UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer()
.toByteBuffer()
.array());
@@ -178,10 +178,10 @@
for (int i = 0; i < msgs.size(); i++)
{
- Assert.assertEquals(simpleDestination, msgs.get(i).getMessage(null).getAddress());
+ Assert.assertEquals(simpleDestination, msgs.get(i).getMessage().getAddress());
UnitTestCase.assertEqualsByteArrays(buffers.get(i).toByteBuffer().array(), msgs.get(i)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer()
.toByteBuffer()
.array());
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -98,7 +98,7 @@
Assert.assertEquals(1, msgs.size());
UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer()
.toByteBuffer()
.array());
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-14 21:40:19 UTC (rev 9787)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-10-15 01:18:33 UTC (rev 9788)
@@ -290,7 +290,7 @@
for (int i = 0; i < numMessages; i++)
{
HornetQBuffer horn1 = buffers.get(i);
- HornetQBuffer horn2 = msg.get(i).getMessage(null).getBodyBuffer();
+ HornetQBuffer horn2 = msg.get(i).getMessage().getBodyBuffer();
horn1.resetReaderIndex();
horn2.resetReaderIndex();
for (int j = 0; j < horn1.writerIndex(); j++)
@@ -368,9 +368,9 @@
for (int i = 0; i < 5; i++)
{
- Assert.assertEquals(sequence++, msg.get(i).getMessage(null).getMessageID());
+ Assert.assertEquals(sequence++, msg.get(i).getMessage().getMessageID());
UnitTestCase.assertEqualsBuffers(18, buffers.get(pageNr * 5 + i), msg.get(i)
- .getMessage(null)
+ .getMessage()
.getBodyBuffer());
}
}
@@ -413,9 +413,9 @@
Assert.assertEquals(1, msgs.size());
- Assert.assertEquals(1l, msgs.get(0).getMessage(null).getMessageID());
+ Assert.assertEquals(1l, msgs.get(0).getMessage().getMessageID());
- UnitTestCase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage(null).getBodyBuffer());
+ UnitTestCase.assertEqualsBuffers(18, buffers.get(0), msgs.get(0).getMessage().getBodyBuffer());
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -594,14 +594,14 @@
for (PagedMessage msg : msgs)
{
- long id = msg.getMessage(null).getBodyBuffer().readLong();
- msg.getMessage(null).getBodyBuffer().resetReaderIndex();
+ long id = msg.getMessage().getBodyBuffer().readLong();
+ msg.getMessage().getBodyBuffer().resetReaderIndex();
ServerMessage msgWritten = buffers.remove(id);
- buffers2.put(id, msg.getMessage(null));
+ buffers2.put(id, msg.getMessage());
Assert.assertNotNull(msgWritten);
- Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
- UnitTestCase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage(null).getBodyBuffer());
+ Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddress());
+ UnitTestCase.assertEqualsBuffers(10, msgWritten.getBodyBuffer(), msg.getMessage().getBodyBuffer());
}
}
@@ -667,13 +667,13 @@
for (PagedMessage msg : msgs)
{
- long id = msg.getMessage(null).getBodyBuffer().readLong();
+ long id = msg.getMessage().getBodyBuffer().readLong();
ServerMessage msgWritten = buffers2.remove(id);
Assert.assertNotNull(msgWritten);
- Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
+ Assert.assertEquals(msg.getMessage().getAddress(), msgWritten.getAddress());
UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(),
msgWritten.getBodyBuffer().toByteBuffer().array(),
- msg.getMessage(null).getBodyBuffer().toByteBuffer().array());
+ msg.getMessage().getBodyBuffer().toByteBuffer().array());
}
}
@@ -682,8 +682,8 @@
lastPage.close();
Assert.assertEquals(1, lastMessages.size());
- lastMessages.get(0).getMessage(null).getBodyBuffer().resetReaderIndex();
- Assert.assertEquals(lastMessages.get(0).getMessage(null).getBodyBuffer().readLong(), lastMessageId);
+ lastMessages.get(0).getMessage().getBodyBuffer().resetReaderIndex();
+ Assert.assertEquals(lastMessages.get(0).getMessage().getBodyBuffer().readLong(), lastMessageId);
Assert.assertEquals(0, buffers2.size());
@@ -796,7 +796,7 @@
for (PagedMessage pgmsg : messages)
{
- ServerMessage msg = pgmsg.getMessage(null);
+ ServerMessage msg = pgmsg.getMessage();
assertEquals(msgsRead++, msg.getMessageID());
13 years, 6 months
JBoss hornetq SVN: r9787 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-14 17:40:19 -0400 (Thu, 14 Oct 2010)
New Revision: 9787
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Improving cleanup
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java 2010-10-14 21:40:19 UTC (rev 9787)
@@ -29,6 +29,8 @@
// Cursor query operations --------------------------------------
+ void stop();
+
Pair<PagePosition, ServerMessage> moveNext() throws Exception;
void ack(PagePosition position) throws Exception;
@@ -55,4 +57,6 @@
* @param position
*/
void redeliver(PagePosition position);
+
+ void printDebug();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
@@ -41,6 +41,7 @@
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
/**
* A PageCursorImpl
@@ -57,11 +58,11 @@
// Attributes ----------------------------------------------------
- private final boolean isTrace = true; //PageCursorImpl.log.isTraceEnabled();
+ private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
private static void trace(final String message)
{
- //PageCursorImpl.log.info(message);
+ // PageCursorImpl.log.info(message);
System.out.println(message);
}
@@ -77,6 +78,8 @@
private volatile PagePosition lastPosition;
+ private volatile PagePosition lastAckedPosition;
+
private List<PagePosition> recoveredACK;
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
@@ -275,11 +278,29 @@
previousPos = pos;
}
+ this.lastAckedPosition = lastPosition;
+
recoveredACK.clear();
recoveredACK = null;
}
}
+
+ public void stop()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ future.await(1000);
+ }
+ public void printDebug()
+ {
+ System.out.println("Debug information on PageCurorImpl- " + this);
+ for (PageCursorInfo info : consumedPages.values())
+ {
+ System.out.println(info);
+ }
+ }
+
/**
* @param page
* @return
@@ -315,7 +336,10 @@
// The only exception is on non storage events such as not matching messages
private void processACK(final PagePosition pos)
{
- System.out.println("Processing ack for " + pos);
+ if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
+ {
+ this.lastAckedPosition = pos;
+ }
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
@@ -387,7 +411,14 @@
{
if (entry.getValue().isDone())
{
- completedPages.add(entry.getValue());
+ if (entry.getKey() == lastAckedPosition.getPageNr())
+ {
+ System.out.println("We can't clear page " + entry.getKey() + " now since it's the current page");
+ }
+ else
+ {
+ completedPages.add(entry.getValue());
+ }
}
}
}
@@ -396,27 +427,8 @@
{
PageCursorInfo info = completedPages.get(i);
- boolean firstLine = true;
for (PagePosition pos : info.acks)
{
- if (firstLine)
- {
- firstLine = false;
- // We only do this check at the first line
- PageCache cache = pos.getPageCache();
- // The live cache has a hard reference on the PagingStoreImpl,
- // So we are sure the reference would be filled on the PagePosition
- if (cache != null && cache.isLive())
- {
- completedPages.remove(i);
- break;
- }
- if (isTrace)
- {
- PageCursorImpl.trace("Cleaning ACK records on page " + info.getPageId());
- }
- }
-
if (pos.getRecordID() > 0)
{
store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
@@ -444,6 +456,7 @@
{
PageCursorImpl.trace("Removing page " + completePage.getPageId());
}
+ System.out.println("Removing page " + completePage.getPageId());
consumedPages.remove(completePage.getPageId());
}
}
@@ -475,6 +488,11 @@
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
+ public String toString()
+ {
+ return "PageCursorInfo::PaeID=" + pageId + " numberOfMessage = " + numberOfMessages;
+ }
+
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
{
this.pageId = pageId;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14 21:40:19 UTC (rev 9787)
@@ -192,6 +192,11 @@
public void stop()
{
+ for (PageCursor cursor : activeCursors.values())
+ {
+ cursor.stop();
+ }
+
activeCursors.clear();
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14 09:43:10 UTC (rev 9786)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14 21:40:19 UTC (rev 9787)
@@ -25,6 +25,7 @@
import org.hornetq.core.paging.cursor.PageCursor;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PageCursorImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -153,45 +154,61 @@
System.out.println("Number of pages = " + numberOfPages);
PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- cursorProvider.printDebug();
PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+
+ PageCache firstPage = cursorProvider.getPageCache(new PagePositionImpl(server.getPagingManager().getPageStore(ADDRESS).getFirstPage(), 0));
+
+ int firstPageSize = firstPage.getNumberOfMessages();
+ firstPage = null;
+
System.out.println("Cursor: " + cursor);
+ cursorProvider.printDebug();
+
for (int i = 0 ; i < 1000 ; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
- cursorProvider.printDebug();
assertNotNull(msg);
assertEquals(i, msg.b.getIntProperty("key").intValue());
- if (i < 500)
+ if (i < firstPageSize)
{
cursor.ack(msg.a);
}
}
+ cursorProvider.printDebug();
+
+ // needs to clear the context since we are using the same thread over two distinct servers
+ // otherwise we will get the old executor on the factory
+ OperationContextImpl.clearContext();
- OperationContextImpl.getContext(null).waitCompletion();
-
server.stop();
server.start();
cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
- for (int i = 500; i < NUM_MESSAGES; i++)
+ for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
+ System.out.println("Received " + i);
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ assertNotNull(msg);
assertEquals(i, msg.b.getIntProperty("key").intValue());
+
cursor.ack(msg.a);
+
+ OperationContextImpl.getContext(null).waitCompletion();
+
}
+
+ OperationContextImpl.getContext(null).waitCompletion();
+ ((PageCursorImpl)cursor).printDebug();
-
}
-
public void testRestartWithHoleOnAck() throws Exception
{
@@ -424,6 +441,7 @@
protected void setUp() throws Exception
{
super.setUp();
+ OperationContextImpl.clearContext();
System.out.println("Tmp:" + getTemporaryDir());
Configuration config = createDefaultConfig();
@@ -445,6 +463,7 @@
protected void tearDown() throws Exception
{
+ OperationContextImpl.clearContext();
server.stop();
super.tearDown();
}
13 years, 6 months
JBoss hornetq SVN: r9786 - in branches/hornetq-416/src/main/org/hornetq: core/protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-14 05:43:10 -0400 (Thu, 14 Oct 2010)
New Revision: 9786
Modified:
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
Log:
setClientID impl
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-10-14 00:45:50 UTC (rev 9785)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-10-14 09:43:10 UTC (rev 9786)
@@ -78,10 +78,12 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.CONNECTION_SET_CLIENTID;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.ConnectionSetClientIDMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
@@ -486,6 +488,11 @@
packet = new SessionForceConsumerDelivery();
break;
}
+ case CONNECTION_SET_CLIENTID:
+ {
+ packet = new ConnectionSetClientIDMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-14 09:43:10 UTC (rev 9786)
@@ -563,4 +563,9 @@
{
clientID = cID;
}
+
+ public String getClientID()
+ {
+ return clientID;
+ }
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java 2010-10-14 00:45:50 UTC (rev 9785)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java 2010-10-14 09:43:10 UTC (rev 9786)
@@ -24,6 +24,11 @@
public class ConnectionSetClientIDMessage extends PacketImpl
{
private String clientID;
+
+ public ConnectionSetClientIDMessage()
+ {
+ super(PacketImpl.CONNECTION_SET_CLIENTID);
+ }
public ConnectionSetClientIDMessage(String cID)
{
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-10-14 09:43:10 UTC (rev 9786)
@@ -742,6 +742,7 @@
obj.put("connectionID", connection.getID().toString());
obj.put("clientAddress", connection.getRemoteAddress());
obj.put("creationTime", connection.getCreationTime());
+ obj.put("clientID", connection.getClientID());
array.put(obj);
}
return array.toString();
Modified: branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-14 00:45:50 UTC (rev 9785)
+++ branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-14 09:43:10 UTC (rev 9786)
@@ -13,6 +13,7 @@
package org.hornetq.spi.core.protocol;
+import java.util.Collection;
import java.util.List;
import org.hornetq.api.core.HornetQBuffer;
@@ -172,4 +173,10 @@
*/
void setClientID(String clientID);
+ /**
+ * get the client id
+ * @return the clientID
+ */
+ String getClientID();
+
}
13 years, 6 months
JBoss hornetq SVN: r9785 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-10-13 20:45:50 -0400 (Wed, 13 Oct 2010)
New Revision: 9785
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Implementing first step on cleanup after a whole page is consumed
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/LivePageCacheImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
@@ -38,6 +38,11 @@
private final Page page;
private boolean isLive = true;
+
+ public String toString()
+ {
+ return "LivePacheCacheImpl::page=" + page.getPageId() + " number of messages=" + messages.size() + " isLive = " + isLive;
+ }
// Static --------------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
@@ -115,8 +115,13 @@
*/
public boolean isLive()
{
- return true;
+ return false;
}
+
+ public String toString()
+ {
+ return "PageCacheImpl::page=" + page.getPageId() + " numberOfMessages = " + messages.length;
+ }
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.cursor.impl;
+import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -56,6 +57,14 @@
// Attributes ----------------------------------------------------
+ private final boolean isTrace = true; //PageCursorImpl.log.isTraceEnabled();
+
+ private static void trace(final String message)
+ {
+ //PageCursorImpl.log.info(message);
+ System.out.println(message);
+ }
+
private final StorageManager store;
private final long cursorId;
@@ -190,6 +199,7 @@
*/
public void reloadACK(final PagePosition position)
{
+ System.out.println("reloading " + position);
if (recoveredACK == null)
{
recoveredACK = new LinkedList<PagePosition>();
@@ -211,7 +221,10 @@
{
if (recoveredACK != null)
{
- System.out.println("********** processing reload!!!!!!!");
+ if (isTrace)
+ {
+ PageCursorImpl.trace("********** processing reload!!!!!!!");
+ }
Collections.sort(recoveredACK);
PagePosition previousPos = null;
@@ -260,7 +273,6 @@
}
previousPos = pos;
- System.out.println("pos: " + pos);
}
recoveredACK.clear();
@@ -272,14 +284,15 @@
* @param page
* @return
*/
- private PageCursorInfo getPageInfo(final PagePosition pos)
+ private synchronized PageCursorInfo getPageInfo(final PagePosition pos)
{
PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
if (pageInfo == null)
{
PageCache cache = cursorProvider.getPageCache(pos);
- pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
+ System.out.println("Number of Messages = " + cache.getNumberOfMessages());
+ pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
consumedPages.put(pos.getPageNr(), pageInfo);
}
@@ -302,6 +315,7 @@
// The only exception is on non storage events such as not matching messages
private void processACK(final PagePosition pos)
{
+ System.out.println("Processing ack for " + pos);
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
@@ -338,8 +352,6 @@
*/
private void onPageDone(final PageCursorInfo info)
{
- System.out.println("Page " + info.getPageId() + " has completed");
-
executor.execute(new Runnable()
{
@@ -351,7 +363,7 @@
}
catch (Exception e)
{
- PageCursorImpl.log.warn("Error on cleaning up cursor pages");
+ PageCursorImpl.log.warn("Error on cleaning up cursor pages", e);
}
}
});
@@ -368,7 +380,7 @@
final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
- // First get the completed pages using a lock
+ // First get the completed pages using a lock
synchronized (this)
{
for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
@@ -380,10 +392,31 @@
}
}
- for (PageCursorInfo info : completedPages)
+ for (int i = 0; i < completedPages.size(); i++)
{
+ PageCursorInfo info = completedPages.get(i);
+
+ boolean firstLine = true;
for (PagePosition pos : info.acks)
{
+ if (firstLine)
+ {
+ firstLine = false;
+ // We only do this check at the first line
+ PageCache cache = pos.getPageCache();
+ // The live cache has a hard reference on the PagingStoreImpl,
+ // So we are sure the reference would be filled on the PagePosition
+ if (cache != null && cache.isLive())
+ {
+ completedPages.remove(i);
+ break;
+ }
+ if (isTrace)
+ {
+ PageCursorImpl.trace("Cleaning ACK records on page " + info.getPageId());
+ }
+ }
+
if (pos.getRecordID() > 0)
{
store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
@@ -407,7 +440,10 @@
{
for (PageCursorInfo completePage : completedPages)
{
- System.out.println("Removing page " + completePage.getPageId());
+ if (isTrace)
+ {
+ PageCursorImpl.trace("Removing page " + completePage.getPageId());
+ }
consumedPages.remove(completePage.getPageId());
}
}
@@ -430,19 +466,29 @@
// Confirmed ACKs on this page
private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
+ private WeakReference<PageCache> cache;
+
+ // The page was live at the time of the creation
+ private final boolean wasLive;
+
// We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
// expressions
private final AtomicInteger confirmed = new AtomicInteger(0);
- public PageCursorInfo(final long pageId, final int numberOfMessages)
+ public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache)
{
this.pageId = pageId;
this.numberOfMessages = numberOfMessages;
+ wasLive = cache.isLive();
+ if (wasLive)
+ {
+ this.cache = new WeakReference<PageCache>(cache);
+ }
}
public boolean isDone()
{
- return numberOfMessages == confirmed.get();
+ return getNumberOfMessages() == confirmed.get();
}
/**
@@ -461,12 +507,43 @@
acks.add(posACK);
}
- if (numberOfMessages == confirmed.incrementAndGet())
+ if (isTrace)
{
+ PageCursorImpl.trace("numberOfMessages = " + getNumberOfMessages() +
+ " confirmed = " +
+ (confirmed.get() + 1) +
+ ", page = " +
+ pageId);
+ }
+
+ if (getNumberOfMessages() == confirmed.incrementAndGet())
+ {
onPageDone(this);
}
}
+ private int getNumberOfMessages()
+ {
+ if (wasLive)
+ {
+ PageCache cache = this.cache.get();
+ if (cache != null)
+ {
+ return cache.getNumberOfMessages();
+ }
+ else
+ {
+ cache = cursorProvider.getPageCache(new PagePositionImpl(pageId, 0));
+ this.cache = new WeakReference<PageCache>(cache);
+ return cache.getNumberOfMessages();
+ }
+ }
+ else
+ {
+ return numberOfMessages;
+ }
+ }
+
}
static class PageCursorTX implements TransactionOperation
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
@@ -194,6 +194,14 @@
{
activeCursors.clear();
}
+
+ public void printDebug()
+ {
+ for (PageCache cache: softCache.values())
+ {
+ System.out.println("Cache " + cache);
+ }
+ }
// Package protected ---------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
@@ -15,7 +15,6 @@
import java.lang.ref.WeakReference;
-import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PagePosition;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-10-14 00:45:50 UTC (rev 9785)
@@ -559,13 +559,7 @@
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
Page page = new PageImpl(storeName, storageManager, fileFactory, file, pageNumber);
-
- LivePageCache pageCache = new LivePageCacheImpl(page);
-
- page.setLiveCache(pageCache);
- cursorProvider.addPageCache(pageCache);
-
// To create the file
file.open();
@@ -1209,7 +1203,13 @@
}
currentPage = createPage(currentPageId);
+
+ LivePageCache pageCache = new LivePageCacheImpl(currentPage);
+
+ currentPage.setLiveCache(pageCache);
+ cursorProvider.addPageCache(pageCache);
+
currentPageSize.set(0);
currentPage.open();
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-13 15:43:57 UTC (rev 9784)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-10-14 00:45:50 UTC (rev 9785)
@@ -152,15 +152,18 @@
System.out.println("Number of pages = " + numberOfPages);
- PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
- System.out.println("cursorProvider = " + cursorProvider);
+ PageCursorProviderImpl cursorProvider = (PageCursorProviderImpl)this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+ cursorProvider.printDebug();
+
PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
System.out.println("Cursor: " + cursor);
for (int i = 0 ; i < 1000 ; i++)
{
Pair<PagePosition, ServerMessage> msg = cursor.moveNext();
+ cursorProvider.printDebug();
+ assertNotNull(msg);
assertEquals(i, msg.b.getIntProperty("key").intValue());
if (i < 500)
@@ -328,7 +331,7 @@
assertNotNull(readMessage);
- // TODO: ack on live data
+ cursor.ack(readMessage.a);
assertEquals(i, readMessage.b.getIntProperty("key").intValue());
13 years, 6 months
JBoss hornetq SVN: r9784 - in branches/hornetq-416: src/main/org/hornetq/api/core/client and 11 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-10-13 11:43:57 -0400 (Wed, 13 Oct 2010)
New Revision: 9784
Added:
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java
Modified:
branches/hornetq-416/.project
branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java
branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java
branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
Log:
setClientID
Modified: branches/hornetq-416/.project
===================================================================
--- branches/hornetq-416/.project 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/.project 2010-10-13 15:43:57 UTC (rev 9784)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>trunk</name>
+ <name>hornetq416</name>
<comment></comment>
<projects>
</projects>
Modified: branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/api/core/client/ClientSession.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -562,4 +562,10 @@
*/
void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
+ /**
+ * Sets ClientID of the associated JMS connection.
+ * @param clientID the client ID
+ */
+ void setClientID(String clientID);
+
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -39,6 +39,7 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.ConnectionSetClientIDMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -1810,4 +1811,10 @@
}
}
+
+ public void setClientID(String clientID)
+ {
+ ConnectionSetClientIDMessage msg = new ConnectionSetClientIDMessage(clientID);
+ channel.send(msg);
+ }
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -555,4 +555,9 @@
{
session.setPacketSize(packetSize);
}
+
+ public void setClientID(String clientID)
+ {
+ session.setClientID(clientID);
+ }
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -57,6 +57,7 @@
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.ConnectionSetClientIDMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -460,6 +461,11 @@
session.requestProducerCredits(message.getAddress(), message.getCredits());
break;
}
+ case PacketImpl.CONNECTION_SET_CLIENTID:
+ {
+ ConnectionSetClientIDMessage message = (ConnectionSetClientIDMessage)packet;
+ session.setConnectionClientID(message.getClientID());
+ }
}
}
catch (HornetQXAException e)
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -182,6 +182,8 @@
public static final byte REPLICATION_SYNC = 103;
+ public static final byte CONNECTION_SET_CLIENTID = 104;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -90,6 +90,8 @@
private volatile boolean executing;
private final long creationTime;
+
+ private String clientID;
// Constructors
// ---------------------------------------------------------------------------------
@@ -556,4 +558,9 @@
channel.close();
}
}
+
+ public void setClientID(String cID)
+ {
+ clientID = cID;
+ }
}
Added: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java (rev 0)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/ConnectionSetClientIDMessage.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * A ConnectionSetClientIDMessage
+ *
+ *
+ */
+public class ConnectionSetClientIDMessage extends PacketImpl
+{
+ private String clientID;
+
+ public ConnectionSetClientIDMessage(String cID)
+ {
+ super(PacketImpl.CONNECTION_SET_CLIENTID);
+ clientID = cID;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(clientID);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ clientID = buffer.readString();
+ }
+
+ public String getClientID()
+ {
+ return clientID;
+ }
+
+}
Modified: branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -45,4 +45,6 @@
void freeze();
RemotingConnection getServerSideReplicatingConnection();
+
+ void setConnectionClientID(Object connID, String clientID);
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -570,4 +570,10 @@
}
}
}
+
+ public void setConnectionClientID(Object connID, String clientID)
+ {
+ ConnectionEntry conn = connections.get(connID);
+ conn.connection.setClientID(clientID);
+ }
}
\ No newline at end of file
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -113,4 +113,6 @@
void setTransferring(boolean transferring);
Set<ServerConsumer> getServerConsumers();
+
+ void setConnectionClientID(String clientID);
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -1183,4 +1183,9 @@
routingContext.clear();
}
+ public void setConnectionClientID(String clientID)
+ {
+ this.server.getRemotingService().setConnectionClientID(this.getConnectionID(), clientID);
+ }
+
}
Modified: branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -181,6 +181,7 @@
}
this.clientID = clientID;
+ initialSession.setClientID(clientID);
justCreated = false;
}
Modified: branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -165,4 +165,11 @@
*/
void flush();
+ /**
+ * set the connections's clientID
+ *
+ * @param clientID the clientID
+ */
+ void setClientID(String clientID);
+
}
Modified: branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2010-10-13 08:54:31 UTC (rev 9783)
+++ branches/hornetq-416/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2010-10-13 15:43:57 UTC (rev 9784)
@@ -1390,6 +1390,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#setClientID(java.lang.String)
+ */
+ public void setClientID(String clientID)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
13 years, 6 months