Author: gaohoward
Date: 2010-11-01 06:17:23 -0400 (Mon, 01 Nov 2010)
New Revision: 9826
Added:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
Modified:
branches/hornetq-416/build-hornetq.properties
branches/hornetq-416/build-hornetq.xml
branches/hornetq-416/docs/user-manual/en/configuration-index.xml
branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml
branches/hornetq-416/examples/common/build.xml
branches/hornetq-416/examples/core/twitter-connector/build.xml
branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml
branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java
branches/hornetq-416/pom.xml
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
Log:
back merge from trunk
Modified: branches/hornetq-416/build-hornetq.properties
===================================================================
--- branches/hornetq-416/build-hornetq.properties 2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/build-hornetq.properties 2010-11-01 10:17:23 UTC (rev 9826)
@@ -9,6 +9,7 @@
javac.include.ant.runtime=false
javac.include.java.runtime=true
javac.fail.onerror=true
+javac.encoding=utf-8
# JUnit properties
junit.showoutput=true
Modified: branches/hornetq-416/build-hornetq.xml
===================================================================
--- branches/hornetq-416/build-hornetq.xml 2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/build-hornetq.xml 2010-11-01 10:17:23 UTC (rev 9826)
@@ -449,6 +449,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${build.src.dir}"/>
@@ -493,6 +494,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${build.src.dir}"/>
@@ -531,6 +533,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -551,6 +554,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -571,6 +575,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -591,6 +596,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -611,6 +617,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -632,6 +639,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -652,6 +660,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -672,6 +681,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${src.main.dir}"/>
@@ -692,6 +702,7 @@
deprecation="${javac.deprecation}"
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
failonerror="${javac.fail.onerror}">
<src>
<pathelement path="${build.src.dir}"/>
@@ -1586,6 +1597,7 @@
includeAntRuntime="${javac.include.ant.runtime}"
includeJavaRuntime="${javac.include.java.runtime}"
failonerror="${javac.fail.onerror}"
+ encoding="${javac.encoding}"
srcdir="${test.src.dir}"
destdir="${test.classes.dir}">
<classpath refid="test.compilation.classpath"/>
@@ -1604,6 +1616,7 @@
includeAntRuntime="true"
includeJavaRuntime="${javac.include.java.runtime}"
failonerror="${javac.fail.onerror}"
+ encoding="${javac.encoding}"
srcdir="${test.jms.src.dir}"
destdir="${test.jms.classes.dir}">
<classpath refid="jms.test.compilation.classpath"/>
@@ -1622,6 +1635,7 @@
includeAntRuntime="true"
includeJavaRuntime="${javac.include.java.runtime}"
failonerror="${javac.fail.onerror}"
+ encoding="${javac.encoding}"
srcdir="${test.joram.src.dir}"
destdir="${test.joram.classes.dir}">
<classpath refid="joram.test.compilation.classpath"/>
@@ -1696,8 +1710,10 @@
timeout="${junit.timeout}">
<sysproperty key="user.home" value="${user.home}"/>
<sysproperty key="java.io.tmpdir"
value="${java.io.tmpdir}"/>
- <sysproperty key="twitter.username"
value="${twitter.username}"/>
- <sysproperty key="twitter.password"
value="${twitter.password}"/>
+ <sysproperty key="twitter.consumerKey"
value="${twitter.consumerKey}"/>
+ <sysproperty key="twitter.consumerSecret"
value="${twitter.consumerSecret}"/>
+ <sysproperty key="twitter.accessToken"
value="${twitter.accessToken}"/>
+ <sysproperty key="twitter.accessTokenSecret"
value="${twitter.accessTokenSecret}"/>
<jvmarg value="-Djava.library.path=native/bin"/>
<jvmarg value="-Dmodule.output=./"/>
<jvmarg
value="-Djava.util.logging.config.file=src/config/trunk/non-clustered/logging.properties"/>
Modified: branches/hornetq-416/docs/user-manual/en/configuration-index.xml
===================================================================
--- branches/hornetq-416/docs/user-manual/en/configuration-index.xml 2010-11-01 09:44:00
UTC (rev 9825)
+++ branches/hornetq-416/docs/user-manual/en/configuration-index.xml 2010-11-01 10:17:23
UTC (rev 9826)
@@ -1026,7 +1026,7 @@
<entry>generic</entry>
</row>
<row>
- <entry
id="configuration.connection-factory.signature">
+ <entry
id="configuration.connection-factory.signature.xa">
<link
linkend="using-jms.configure.factory.types">connection-factory.xa</link>
</entry>
<entry>Boolean</entry>
Modified: branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml
===================================================================
--- branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml 2010-11-01 09:44:00 UTC
(rev 9825)
+++ branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml 2010-11-01 10:17:23 UTC
(rev 9826)
@@ -46,19 +46,8 @@
two different helper classes for this depending on whether your using the
HornetQ Core API or JMS.</para>
-<<<<<<< .mine
<section>
<title>Core API Only</title>
-=======
-config.setAcceptorConfigurations(transports);</programlisting>
- <para>You need to instantiate and start HornetQ server. The class
<literal
- >org.hornetq.api.core.server.HornetQ</literal> has a few static
methods for creating
- servers with common configurations.</para>
- <programlisting>import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-
->>>>>>> .r9629
-
<para>For instantiating a core HornetQ Server only, the steps are pretty
simple. The example requires that you have defined a configuration file
<literal>hornetq-configuration.xml</literal> in your
@@ -66,38 +55,9 @@
...
-<<<<<<< .mine
EmbeddedHornetQ embedded = new EmbeddedHornetQ();
embedded.start();
-=======
-HornetQServer server = HornetQServers.newHornetQServer(config);
->>>>>>> .r9629
-<<<<<<< .mine
-// Assuming you defined an "in-vm" acceptor within your
hornetq-configuration.xml file
-=======
-server.start();</programlisting>
- <para>You also have the option of instantiating
<literal>HornetQServerImpl</literal>
- directly:</para>
- <programlisting>HornetQServer server = new HornetQServerImpl(config);
-server.start();</programlisting>
- </section>
- <section>
- <title>Dependency Frameworks</title>
- <para>You may also choose to use a dependency injection framework such as
<trademark>JBoss
- Micro Container</trademark> or <trademark>Spring
Framework</trademark>.</para>
- <para>HornetQ standalone uses JBoss Micro Container as the injection
framework. <literal
- >HornetQBootstrapServer</literal> and
<literal>hornetq-beans.xml</literal> which are
- part of the HornetQ distribution provide a very complete implementation of
what's needed
- to bootstrap the server using JBoss Micro Container. </para>
- <para>When using JBoss Micro Container, you need to provide an XML file
declaring the
- <literal>HornetQServer</literal> and
<literal>Configuration</literal> object, you
- can also inject a security manager and a MBean server if you want, but those
are
- optional.</para>
- <para>A very basic XML Bean declaration for the JBoss Micro Container would
be:</para>
- <programlisting><?xml version="1.0"
encoding="UTF-8"?>
->>>>>>> .r9629
-
ClientSessionFactory nettyFactory = HornetQClient.createClientSessionFactory(
new TransportConfiguration(
InVMConnectorFactory.class.getName()));
Modified: branches/hornetq-416/examples/common/build.xml
===================================================================
--- branches/hornetq-416/examples/common/build.xml 2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/examples/common/build.xml 2010-11-01 10:17:23 UTC (rev 9826)
@@ -140,6 +140,7 @@
<!--<echo>client classpath = ${clientClasspath}</echo>-->
<property file="${imported.basedir}/config/server.properties"/>
<java classname="${example.classname}" fork="true"
resultproperty="example-result">
+ <jvmarg line="${client.args}"/>
<jvmarg
value="-Dhornetq.example.server.classpath=${serverclasspath}"/>
<jvmarg value="-Dhornetq.example.server.args=${server.args}"/>
<jvmarg
value="-Dhornetq.example.logserveroutput=${hornetq.example.logserveroutput}"/>
Modified: branches/hornetq-416/examples/core/twitter-connector/build.xml
===================================================================
--- branches/hornetq-416/examples/core/twitter-connector/build.xml 2010-11-01 09:44:00 UTC
(rev 9825)
+++ branches/hornetq-416/examples/core/twitter-connector/build.xml 2010-11-01 10:17:23 UTC
(rev 9826)
@@ -19,10 +19,13 @@
<import file="../../common/build.xml"/>
<property environment='env'/>
- <target name="check" unless="env.TWITTER_USERNAME">
+ <target name="check" unless="env.TWITTER_CONSUMER_KEY">
<echo>**************************************************************************</echo>
<echo>* Please set the twitter account:
*</echo>
- <echo>* ./build.sh -Denv.TWITTER_USERNAME=user
-Denv.TWITTER_PASSWORD=password *</echo>
+ <echo>* ./build.sh -Denv.TWITTER_CONSUMER_KEY=consumerKey \
*</echo>
+ <echo>* -Denv.TWITTER_CONSUMER_SECRET=consumerSecret \
*</echo>
+ <echo>* -Denv.TWITTER_ACCESS_TOKEN=accessToken \
*</echo>
+ <echo>* -Denv.TWITTER_ACCESS_TOKEN_SECRET=accessTokenSecret
*</echo>
<echo>**************************************************************************</echo>
<fail message="run example failed"/>
</target>
@@ -30,10 +33,11 @@
<target name="run" depends="check">
<antcall target="runExample">
<param name="example.classname"
value="org.hornetq.core.example.TwitterConnectorExample"/>
+ <param name="client.args"
value="-Dtwitter.example.alternativeMessage=${env.TWITTER_EXAMPLE_ALTERNATIVE_MESSAGE}"/>
<!-- HTTP proxy settings
<param name="server.args"
value="-Dtwitter4j.http.proxyHost=your.proxy.server
-Dtwitter4j.http.proxyPort=your.proxy.port"/>
-->
- <param name="server.args"
value="-Dtwitter.username=${env.TWITTER_USERNAME}
-Dtwitter.password=${env.TWITTER_PASSWORD}"/>
+ <param name="server.args"
value="-Dtwitter.consumerKey=${env.TWITTER_CONSUMER_KEY}
-Dtwitter.consumerSecret=${env.TWITTER_CONSUMER_SECRET}
-Dtwitter.accessToken=${env.TWITTER_ACCESS_TOKEN}
-Dtwitter.accessTokenSecret=${env.TWITTER_ACCESS_TOKEN_SECRET}"/>
</antcall>
</target>
Modified:
branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml
===================================================================
---
branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml 2010-11-01
10:17:23 UTC (rev 9826)
@@ -13,7 +13,7 @@
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">
-
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
</acceptor>
</acceptors>
@@ -44,15 +44,19 @@
<connector-service name="my-incoming-tweets">
<factory-class>org.hornetq.integration.twitter.TwitterIncomingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.incomingQueue"/>
- <param key="username" value="${twitter.username}"/>
- <param key="password" value="${twitter.password}"/>
+ <param key="consumerKey"
value="${twitter.consumerKey}"/>
+ <param key="consumerSecret"
value="${twitter.consumerSecret}"/>
+ <param key="accessToken"
value="${twitter.accessToken}"/>
+ <param key="accessTokenSecret"
value="${twitter.accessTokenSecret}"/>
<param key="interval" value="60"/>
</connector-service>
<connector-service name="my-outgoing-tweets">
<factory-class>org.hornetq.integration.twitter.TwitterOutgoingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.outgoingQueue"/>
- <param key="username" value="${twitter.username}"/>
- <param key="password" value="${twitter.password}"/>
+ <param key="consumerKey"
value="${twitter.consumerKey}"/>
+ <param key="consumerSecret"
value="${twitter.consumerSecret}"/>
+ <param key="accessToken"
value="${twitter.accessToken}"/>
+ <param key="accessTokenSecret"
value="${twitter.accessTokenSecret}"/>
</connector-service>
</connector-services>
Modified:
branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
===================================================================
---
branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -44,6 +44,11 @@
ClientSession session = null;
try
{
+ String testMessage = System.currentTimeMillis() + ": " +
System.getProperty("twitter.example.alternativeMessage");
+ if(testMessage == null || testMessage.trim().equals("")) {
+ testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ fans!!
We are now experiencing so fast, so reliable and so exciting messaging never seen before
;-) ###";
+ }
+
// Step 1. Create a ClientSessionFactory.
csf = HornetQClient.createClientSessionFactory (new
TransportConfiguration(NettyConnectorFactory.class.getName()));
@@ -58,7 +63,6 @@
// Step 5. Create a core message.
ClientMessage cm =
session.createMessage(org.hornetq.api.core.Message.TEXT_TYPE,true);
- String testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ
fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen
before ;-) ###";
cm.getBodyBuffer().writeString(testMessage);
// Step 6. Send a message to queue.outgoingQueue.
@@ -76,13 +80,21 @@
ClientMessage received = cc.receive(70 * 1000);
received.acknowledge();
String receivedText = received.getBodyBuffer().readString();
- System.out.println("#### Received a message from " + INCOMING_QUEUE +
": " + receivedText);
- if(!receivedText.equals(testMessage))
+ while(!receivedText.equals(testMessage))
{
- return false;
+ // ignoring other tweets
+ received = cc.receiveImmediate();
+ if(received == null) {
+ // no other tweets. test message has gone...
+ return false;
+ }
+
+ received.acknowledge();
+ receivedText = received.getBodyBuffer().readString();
}
-
+
+ System.out.println("#### Received a message from " + INCOMING_QUEUE +
": " + receivedText);
return true;
}
finally
Modified: branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml
===================================================================
--- branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml 2010-11-01 09:44:00
UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml 2010-11-01 10:17:23
UTC (rev 9826)
@@ -870,6 +870,16 @@
the server. Only usable on topics.</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term>selector</term>
+
+ <listitem>
+ <para>This is an optional JMS selector string. The HornetQ REST
+ interface adds HTTP headers to the JMS message for REST produced
+ messages. HTTP headers are prefixed with "http_" and every
'-'
+ charactor is converted to a '$'.</para>
+ </listitem>
+ </varlistentry>
</variablelist>
<sect1>
@@ -1480,6 +1490,10 @@
<programlisting><push-registration>
<durable>false</durable>
+ <selector><![CDATA[
+ SomeAttribute > 1
+ ]]>
+ </selector>
<link rel="push" href="http://somewhere.com"
type="application/json" method="PUT"/>
</push-registration>
</programlisting>
@@ -1493,6 +1507,10 @@
<literal>queue-push-store-dir</literal> config variable defined in
Chapter 2. (<literal>topic-push-store-dir</literal> for
topics).</para>
+ <para>The <literal>selector</literal> element is optional and
defines a
+ JMS message selector. You should enclose it within CDATA blocks as some
+ of the selector characters are illegal XML.</para>
+
<para>The <literal>link</literal> element specifies the basis of
the
interaction. The <literal>href</literal> attribute contains the URL
you
want to interact with. It is the only required attribute. The
@@ -1562,11 +1580,16 @@
<title>The Topic Push Subscription XML</title>
<para>The push XML for a topic is the same except the root element is
- push-topic-registration. The rest of the document is the same. Here's an
+ push-topic-registration. (Also remember the
<literal>selector</literal>
+ element is optional). The rest of the document is the same. Here's an
example of a template registration:</para>
<programlisting><push-topic-registration>
<durable>true</durable>
+ <selector><![CDATA[
+ SomeAttribute > 1
+ ]]>
+ </selector>
<link rel="template"
href="http://somewhere.com/resources/{id}/messages"
method="POST"/>
</push-topic registration></programlisting>
</sect1>
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -130,7 +130,7 @@
public static <T> T getEntity(ClientMessage msg, Class<T> type, Type
genericType, ResteasyProviderFactory factory)
{
- int size = msg.getBodyBuffer().readInt();
+ int size = msg.getBodySize();
if (size <= 0) return null;
byte[] body = new byte[size];
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -2,12 +2,18 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.rest.util.HttpMessageHelper;
import org.jboss.resteasy.spi.ResteasyProviderFactory;
import org.jboss.resteasy.util.GenericType;
+import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.ext.MessageBodyReader;
+
+import java.io.ByteArrayInputStream;
import java.lang.reflect.Type;
/**
@@ -99,8 +105,12 @@
public static boolean isHttpMessage(Message message)
{
- ClientMessage msg = ((HornetQMessage) message).getCoreMessage();
- return Hornetq.isHttpMessage(msg);
+ try {
+ Boolean aBoolean =
message.getBooleanProperty(HttpMessageHelper.POSTED_AS_HTTP_MESSAGE);
+ return aBoolean != null && aBoolean.booleanValue() == true;
+ } catch (JMSException e) {
+ return false;
+ }
}
/**
@@ -128,8 +138,33 @@
throw new RuntimeException(e);
}
}
- ClientMessage msg = ((HornetQMessage) message).getCoreMessage();
- return Hornetq.getEntity(msg, type, genericType, factory);
+ BytesMessage bytesMessage = (BytesMessage)message;
+
+ try
+ {
+ long size = bytesMessage.getBodyLength();
+ if (size <= 0) return null;
+
+ byte[] body = new byte[(int)size];
+ bytesMessage.readBytes(body);
+
+ String contentType = message.getStringProperty(HttpHeaderProperty.CONTENT_TYPE);
+ if (contentType == null)
+ {
+ throw new UnknownMediaType("Message did not have a Content-Type header
cannot extract entity");
+ }
+ MediaType ct = MediaType.valueOf(contentType);
+ MessageBodyReader<T> reader = factory.getMessageBodyReader(type,
genericType, null, ct);
+ if (reader == null)
+ {
+ throw new UnmarshalException("Unable to find a JAX-RS reader for type "
+ type.getName() + " and media type " + contentType);
+ }
+ return reader.readFrom(type, genericType, null, ct, null, new
ByteArrayInputStream(body));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -31,10 +31,10 @@
protected String startup = Long.toString(System.currentTimeMillis());
protected volatile Acknowledgement ack;
- public AcknowledgedQueueConsumer(ClientSessionFactory factory, String destination,
String id, DestinationServiceManager serviceManager)
+ public AcknowledgedQueueConsumer(ClientSessionFactory factory, String destination,
String id, DestinationServiceManager serviceManager, String selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
autoAck = false;
}
@@ -187,9 +187,7 @@
try
{
- session = factory.createSession();
- consumer = session.createConsumer(destination);
- session.start();
+ createSession();
}
catch (Exception e)
{
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -23,7 +23,7 @@
buildHeaders(builder);
if (data == null)
{
- int size = message.getBodyBuffer().readInt();
+ int size = message.getBodySize();
if (size > 0)
{
data = new byte[size];
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -36,6 +36,9 @@
protected int consumerTimeoutSeconds;
protected DestinationServiceManager serviceManager;
+ protected static final int ACKNOWLEDGED = 0x01;
+ protected static final int SELECTOR_SET = 0x02;
+
public DestinationServiceManager getServiceManager()
{
return serviceManager;
@@ -78,6 +81,7 @@
private Object timeoutLock = new Object();
+ @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
@@ -107,32 +111,41 @@
@POST
public Response createSubscription(@FormParam("autoAck")
@DefaultValue("true") boolean autoAck,
+ @FormParam("selector") String selector,
@Context UriInfo uriInfo)
{
try
{
QueueConsumer consumer = null;
+ int attributes = 0;
+ if (selector != null)
+ {
+ attributes = attributes | SELECTOR_SET;
+ }
+
if (autoAck)
{
- consumer = createConsumer();
+ consumer = createConsumer(selector);
}
else
{
- consumer = createAcknowledgedConsumer();
+ attributes |= ACKNOWLEDGED;
+ consumer = createAcknowledgedConsumer(selector);
}
+ String attributesSegment = "attributes-" + attributes;
UriBuilder location = uriInfo.getAbsolutePathBuilder();
- if (autoAck) location.path("auto-ack");
- else location.path("acknowledged");
+ location.path(attributesSegment);
location.path(consumer.getId());
Response.ResponseBuilder builder = Response.created(location.build());
+
if (autoAck)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder,
uriInfo, uriInfo.getMatchedURIs().get(1) + "/auto-ack/" + consumer.getId(),
"-1");
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder,
uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment
+"/" + consumer.getId(), "-1");
}
else
{
-
AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(),
builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" +
consumer.getId(), "-1");
+
AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(),
builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment
+"/" + consumer.getId(), "-1");
}
return builder.build();
@@ -147,11 +160,11 @@
}
}
- public QueueConsumer createConsumer()
+ public QueueConsumer createConsumer(String selector)
throws HornetQException
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination
+ "-" + startup;
- QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId,
serviceManager);
+ QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId,
serviceManager, selector);
synchronized (timeoutLock)
{
queueConsumers.put(genId, consumer);
@@ -160,11 +173,11 @@
return consumer;
}
- public QueueConsumer createAcknowledgedConsumer()
+ public QueueConsumer createAcknowledgedConsumer(String selector)
throws HornetQException
{
String genId = sessionCounter.getAndIncrement() + "-queue-" + destination
+ "-" + startup;
- QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination,
genId, serviceManager);
+ QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination,
genId, serviceManager, selector);
synchronized (timeoutLock)
{
queueConsumers.put(genId, consumer);
@@ -173,85 +186,81 @@
return consumer;
}
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@GET
- public Response getConsumer(@PathParam("consumer-id") String consumerId,
+ public Response getConsumer(@PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
@Context UriInfo uriInfo) throws Exception
{
- return headConsumer(consumerId, uriInfo);
+ return headConsumer(attributes, consumerId, uriInfo);
}
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@HEAD
- public Response headConsumer(@PathParam("consumer-id") String consumerId,
+ public Response headConsumer(@PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
@Context UriInfo uriInfo) throws Exception
{
- QueueConsumer consumer = findConsumer(consumerId);
+ QueueConsumer consumer = findConsumer(attributes, consumerId, uriInfo);
Response.ResponseBuilder builder = Response.noContent();
// we synchronize just in case a failed request is still processing
synchronized (consumer)
{
- QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder,
uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId(),
Long.toString(consumer.getConsumeIndex()));
+ if ( (attributes & ACKNOWLEDGED) > 0)
+ {
+ AcknowledgedQueueConsumer ackedConsumer =
(AcknowledgedQueueConsumer)consumer;
+ Acknowledgement ack = ackedConsumer.getAck();
+ if (ack == null || ack.wasSet())
+ {
+
AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(),
builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes
+ "/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+ }
+ else
+ {
+ ackedConsumer.setAcknowledgementLink(builder, uriInfo,
uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" +
consumer.getId());
+ }
+
+ }
+ else
+ {
+ QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder,
uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes +
"/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+ }
}
return builder.build();
}
- @Path("auto-ack/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
public QueueConsumer findConsumer(
- @PathParam("consumer-id") String consumerId) throws Exception
+ @PathParam("attributes") int attributes,
+ @PathParam("consumer-id") String consumerId,
+ @Context UriInfo uriInfo) throws Exception
{
QueueConsumer consumer = queueConsumers.get(consumerId);
if (consumer == null)
{
- QueueConsumer tmp = new QueueConsumer(sessionFactory, destination, consumerId,
serviceManager);
- consumer = addConsumerToMap(consumerId, tmp);
- }
- return consumer;
- }
+ if ( (attributes & SELECTOR_SET) > 0)
+ {
- @Path("acknowledged/{consumer-id}")
- @GET
- public Response getAcknowledgedConsumer(@PathParam("consumer-id") String
consumerId,
- @Context UriInfo uriInfo) throws Exception
- {
- return headAcknowledgedConsumer(consumerId, uriInfo);
- }
+ Response.ResponseBuilder builder = Response.status(Response.Status.GONE)
+ .entity("Cannot reconnect to selector-based consumer. You must
recreate the consumer session.")
+ .type("text/plain");
+ UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
+ uriBuilder.path(uriInfo.getMatchedURIs().get(1));
+ serviceManager.getLinkStrategy().setLinkHeader(builder,
"pull-consumers", "pull-consumers", uriBuilder.build().toString(),
null);
+ throw new WebApplicationException(builder.build());
+
+ }
+ if ( (attributes & ACKNOWLEDGED) > 0)
+ {
+ QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory,
destination, consumerId, serviceManager, null);
+ consumer = addConsumerToMap(consumerId, tmp);
- @Path("acknowledged/{consumer-id}")
- @HEAD
- public Response headAcknowledgedConsumer(@PathParam("consumer-id") String
consumerId,
- @Context UriInfo uriInfo) throws Exception
- {
- AcknowledgedQueueConsumer consumer = (AcknowledgedQueueConsumer)
findAcknowledgedConsumer(consumerId);
- Response.ResponseBuilder builder = Response.ok();
- // we synchronize just in case a failed request is still processing
- synchronized (consumer)
- {
- Acknowledgement ack = consumer.getAck();
- if (ack == null || ack.wasSet())
- {
-
AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(),
builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" +
consumer.getId(), Long.toString(consumer.getConsumeIndex()));
}
else
{
- consumer.setAcknowledgementLink(builder, uriInfo,
uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId());
+ QueueConsumer tmp = new QueueConsumer(sessionFactory, destination,
consumerId, serviceManager, null);
+ consumer = addConsumerToMap(consumerId, tmp);
}
}
- return builder.build();
- }
-
-
- @Path("acknowledged/{consumer-id}")
- public QueueConsumer findAcknowledgedConsumer(
- @PathParam("consumer-id") String consumerId) throws Exception
- {
- QueueConsumer consumer = queueConsumers.get(consumerId);
- if (consumer == null)
- {
- QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory, destination,
consumerId, serviceManager);
- ;
- consumer = addConsumerToMap(consumerId, tmp);
- }
return consumer;
}
@@ -275,16 +284,8 @@
}
- @Path("acknowledged/{consumer-id}")
+ @Path("attributes-{attributes}/{consumer-id}")
@DELETE
- public void closeAcknowledgedSession(
- @PathParam("consumer-id") String consumerId)
- {
- closeSession(consumerId);
- }
-
- @Path("auto-ack/{consumer-id}")
- @DELETE
public void closeSession(
@PathParam("consumer-id") String consumerId)
{
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -6,6 +6,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.rest.util.HttpMessageHelper;
+import org.hornetq.api.core.Message;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
@@ -218,7 +219,7 @@
protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body, boolean
durable, ClientSession session) throws Exception
{
- ClientMessage message = session.createMessage(durable);
+ ClientMessage message = session.createMessage(Message.BYTES_TYPE, durable);
HttpMessageHelper.writeHttpMessage(headers, body, message);
return message;
}
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -5,6 +5,7 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.rest.util.Constants;
import org.hornetq.rest.util.LinkStrategy;
@@ -37,6 +38,7 @@
protected long lastPing = System.currentTimeMillis();
protected DestinationServiceManager serviceManager;
protected boolean autoAck = true;
+ protected String selector;
/**
* token used to create consume-next links
@@ -70,14 +72,15 @@
lastPing = System.currentTimeMillis();
}
- public QueueConsumer(ClientSessionFactory factory, String destination, String id,
DestinationServiceManager serviceManager) throws HornetQException
+ public QueueConsumer(ClientSessionFactory factory, String destination, String id,
DestinationServiceManager serviceManager, String selector) throws HornetQException
{
this.factory = factory;
this.destination = destination;
this.id = id;
this.serviceManager = serviceManager;
+ this.selector = selector;
- createSession(factory, destination);
+ createSession();
}
public String getId()
@@ -191,11 +194,18 @@
}
}
- protected void createSession(ClientSessionFactory factory, String destination)
+ protected void createSession()
throws HornetQException
{
session = factory.createSession(true, true);
- consumer = session.createConsumer(destination);
+ if (selector == null)
+ {
+ consumer = session.createConsumer(destination);
+ }
+ else
+ {
+ consumer = session.createConsumer(destination,
SelectorTranslator.convertToHornetQFilterString(selector));
+ }
session.start();
}
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -7,6 +7,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.rest.queue.push.xml.PushRegistration;
/**
@@ -68,7 +69,14 @@
strategy.start();
session = factory.createSession(false, false);
- consumer = session.createConsumer(destination);
+ if (registration.getSelector() != null)
+ {
+ consumer = session.createConsumer(destination,
SelectorTranslator.convertToHornetQFilterString(registration.getSelector()));
+ }
+ else
+ {
+ consumer = session.createConsumer(destination);
+ }
consumer.setMessageHandler(this);
session.start();
log.info("Push consumer started for: " + registration.getTarget());
@@ -100,6 +108,7 @@
}
}
+ @Override
public void onMessage(ClientMessage clientMessage)
{
if (strategy.push(clientMessage) == false)
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -18,7 +18,7 @@
*/
@XmlRootElement(name = "push-registration")
@XmlAccessorType(XmlAccessType.PROPERTY)
-@XmlType(propOrder = {"destination", "durable", "target",
"authenticationMechanism", "headers"})
+@XmlType(propOrder = {"destination", "durable", "selector",
"target", "authenticationMechanism", "headers"})
public class PushRegistration implements Serializable
{
private String id;
@@ -28,6 +28,7 @@
private List<XmlHttpHeader> headers = new ArrayList<XmlHttpHeader>();
private String destination;
private Object loadedFrom;
+ private String selector;
@XmlTransient
public Object getLoadedFrom()
@@ -73,6 +74,16 @@
this.durable = durable;
}
+ public String getSelector()
+ {
+ return selector;
+ }
+
+ public void setSelector(String selector)
+ {
+ this.selector = selector;
+ }
+
@XmlElementRef
public XmlLink getTarget()
{
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -13,10 +13,10 @@
{
private boolean durable;
- public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String
destination, String id, DestinationServiceManager serviceManager)
+ public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String
destination, String id, DestinationServiceManager serviceManager, String selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
}
public boolean isDurable()
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -18,6 +18,7 @@
super(dirname);
}
+ @Override
public synchronized List<PushTopicRegistration> getByTopic(String topic)
{
List<PushTopicRegistration> list = new
ArrayList<PushTopicRegistration>();
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -13,10 +13,10 @@
{
boolean durable;
- public SubscriptionResource(ClientSessionFactory factory, String destination, String
id, DestinationServiceManager serviceManager)
+ public SubscriptionResource(ClientSessionFactory factory, String destination, String
id, DestinationServiceManager serviceManager, String selector)
throws HornetQException
{
- super(factory, destination, id, serviceManager);
+ super(factory, destination, id, serviceManager, selector);
}
public boolean isDurable()
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -84,6 +84,7 @@
private Object timeoutLock = new Object();
+ @Override
public void testTimeout(String target)
{
synchronized (timeoutLock)
@@ -127,6 +128,7 @@
public Response createSubscription(@FormParam("durable")
@DefaultValue("false") boolean durable,
@FormParam("autoAck")
@DefaultValue("true") boolean autoAck,
@FormParam("name") String
subscriptionName,
+ @FormParam("selector") String selector,
@Context UriInfo uriInfo)
{
if (subscriptionName != null)
@@ -185,7 +187,7 @@
session.createTemporaryQueue(destination, subscriptionName);
}
}
- QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName);
+ QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName,
selector);
queueConsumers.put(consumer.getId(), consumer);
serviceManager.getTimeoutTask().add(this, consumer.getId());
@@ -225,19 +227,19 @@
}
}
- protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String
subscriptionName)
+ protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String
subscriptionName, String selector)
throws HornetQException
{
QueueConsumer consumer;
if (autoAck)
{
- SubscriptionResource subscription = new SubscriptionResource(sessionFactory,
subscriptionName, subscriptionName, serviceManager);
+ SubscriptionResource subscription = new SubscriptionResource(sessionFactory,
subscriptionName, subscriptionName, serviceManager, selector);
subscription.setDurable(durable);
consumer = subscription;
}
else
{
- AcknowledgedSubscriptionResource subscription = new
AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName,
serviceManager);
+ AcknowledgedSubscriptionResource subscription = new
AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName,
serviceManager, selector);
subscription.setDurable(durable);
consumer = subscription;
}
@@ -375,7 +377,7 @@
QueueConsumer tmp = null;
try
{
- tmp = createConsumer(true, autoAck, subscriptionId);
+ tmp = createConsumer(true, autoAck, subscriptionId, null);
}
catch (HornetQException e)
{
Modified:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -35,7 +35,7 @@
if (headerName == null) continue;
builder.header(headerName, message.getStringProperty(k));
}
- int size = message.getBodyBuffer().readInt();
+ int size = message.getBodySize();
if (size > 0)
{
byte[] body = new byte[size];
@@ -78,20 +78,23 @@
contentType = value;
}
}
- int size = message.getBodyBuffer().readInt();
+ int size = message.getBodySize();
if (size > 0)
{
- byte[] body = new byte[size];
- message.getBodyBuffer().readBytes(body);
Boolean aBoolean = message.getBooleanProperty(POSTED_AS_HTTP_MESSAGE);
if (aBoolean != null && aBoolean.booleanValue())
{
+ byte[] body = new byte[size];
+ message.getBodyBuffer().readBytes(body);
//System.out.println("Building Message from HTTP message");
request.body(contentType, body);
}
else
{
// assume posted as a JMS or HornetQ object message
+ size = message.getBodyBuffer().readInt();
+ byte[] body = new byte[size];
+ message.getBodyBuffer().readBytes(body);
ByteArrayInputStream bais = new ByteArrayInputStream(body);
Object obj = null;
try
@@ -123,7 +126,6 @@
}
}
message.putBooleanProperty(POSTED_AS_HTTP_MESSAGE, true);
- message.getBodyBuffer().writeInt(body.length);
message.getBodyBuffer().writeBytes(body);
}
Added:
branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
===================================================================
---
branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
(rev 0)
+++
branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -0,0 +1,304 @@
+package org.hornetq.rest.test;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.rest.HttpHeaderProperty;
+import org.hornetq.rest.queue.push.xml.XmlLink;
+import org.hornetq.rest.topic.PushTopicRegistration;
+import org.hornetq.rest.topic.TopicDeployment;
+import org.jboss.resteasy.client.ClientRequest;
+import org.jboss.resteasy.client.ClientResponse;
+import org.jboss.resteasy.spi.Link;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.jboss.resteasy.test.TestPortProvider.*;
+
+/**
+ * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class SelectorTest extends MessageTestBase
+{
+ public static ConnectionFactory connectionFactory;
+ public static String topicName =
HornetQDestination.createQueueAddressFromName("testTopic").toString();
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ connectionFactory = new
HornetQJMSConnectionFactory(manager.getQueueManager().getSessionFactory());
+ System.out.println("Queue name: " + topicName);
+ TopicDeployment deployment = new TopicDeployment();
+ deployment.setDuplicatesAllowed(true);
+ deployment.setDurableSend(false);
+ deployment.setName(topicName);
+ manager.getTopicManager().deploy(deployment);
+ }
+
+ @XmlRootElement
+ public static class Order implements Serializable
+ {
+ private String name;
+ private String amount;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getAmount()
+ {
+ return amount;
+ }
+
+ public void setAmount(String amount)
+ {
+ this.amount = amount;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Order order = (Order) o;
+
+ if (!amount.equals(order.amount)) return false;
+ if (!name.equals(order.name)) return false;
+
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Order{" +
+ "name='" + name + '\'' +
+ '}';
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = name.hashCode();
+ result = 31 * result + amount.hashCode();
+ return result;
+ }
+ }
+
+ public static Destination createDestination(String dest)
+ {
+ HornetQDestination destination = (HornetQDestination)
HornetQDestination.fromAddress(dest);
+ System.out.println("SimpleAddress: " + destination.getSimpleAddress());
+ return destination;
+ }
+
+ public static void publish(String dest, Serializable object, String contentType,
String tag) throws Exception
+ {
+ Connection conn = connectionFactory.createConnection();
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = createDestination(dest);
+ MessageProducer producer = session.createProducer(destination);
+ ObjectMessage message = session.createObjectMessage();
+
+ if (contentType != null)
+ {
+ message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, contentType);
+ }
+ if (tag != null)
+ {
+ message.setStringProperty("MyTag", tag);
+ }
+ message.setObject(object);
+
+ producer.send(message);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ @Path("/push")
+ public static class PushReceiver
+ {
+ public static Order oneOrder;
+ public static Order twoOrder;
+
+ @POST
+ @Path("one")
+ public void one(Order order)
+ {
+ oneOrder = order;
+ }
+
+ @POST
+ @Path("two")
+ public void two(Order order)
+ {
+ twoOrder = order;
+ }
+
+
+ }
+
+ @Test
+ public void testPush() throws Exception
+ {
+
server.getJaxrsServer().getDeployment().getRegistry().addPerRequestResource(PushReceiver.class);
+ ClientRequest request = new ClientRequest(generateURL("/topics/" +
topicName));
+
+ ClientResponse response = request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link consumers =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"push-subscriptions");
+ System.out.println("push: " + consumers);
+
+ PushTopicRegistration oneReg = new PushTopicRegistration();
+ oneReg.setDurable(false);
+ XmlLink target = new XmlLink();
+ target.setMethod("post");
+ target.setHref(generateURL("/push/one"));
+ target.setType("application/xml");
+ oneReg.setTarget(target);
+ oneReg.setSelector("MyTag = '1'");
+ response = consumers.request().body("application/xml", oneReg).post();
+ Link oneSubscription = response.getLocation();
+
+ PushTopicRegistration twoReg = new PushTopicRegistration();
+ twoReg.setDurable(false);
+ target = new XmlLink();
+ target.setMethod("post");
+ target.setHref(generateURL("/push/two"));
+ target.setType("application/xml");
+ twoReg.setTarget(target);
+ twoReg.setSelector("MyTag = '2'");
+ response = consumers.request().body("application/xml", twoReg).post();
+ Link twoSubscription = response.getLocation();
+
+ Order order = new Order();
+ order.setName("1");
+ order.setAmount("$5.00");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("2");
+ publish(topicName, order, null, "2");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.twoOrder);
+
+ order.setName("3");
+ publish(topicName, order, null, "2");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.twoOrder);
+
+ order.setName("4");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("5");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ order.setName("6");
+ publish(topicName, order, null, "1");
+ Thread.sleep(200);
+ Assert.assertEquals(order, PushReceiver.oneOrder);
+
+ oneSubscription.request().delete();
+ twoSubscription.request().delete();
+
+
+ }
+
+
+ @Test
+ public void testPull() throws Exception
+ {
+ ClientRequest request = new ClientRequest(generateURL("/topics/" +
topicName));
+
+ ClientResponse response = request.head();
+ Assert.assertEquals(200, response.getStatus());
+ Link consumers =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"pull-subscriptions");
+ System.out.println("pull: " + consumers);
+ response = consumers.request().formParameter("autoAck",
"true")
+ .formParameter("selector", "MyTag =
'1'").post();
+
+ Link consumeOne =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"consume-next");
+ System.out.println("consumeOne: " + consumeOne);
+ response = consumers.request().formParameter("autoAck",
"true")
+ .formParameter("selector", "MyTag =
'2'").post();
+ Link consumeTwo =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response,
"consume-next");
+ System.out.println("consumeTwo: " + consumeTwo);
+
+
+ // test that Accept header is used to set content-type
+ {
+ Order order = new Order();
+ order.setName("1");
+ order.setAmount("$5.00");
+ publish(topicName, order, null, "1");
+ order.setName("2");
+ publish(topicName, order, null, "2");
+ order.setName("3");
+ publish(topicName, order, null, "2");
+ order.setName("4");
+ publish(topicName, order, null, "1");
+ order.setName("5");
+ publish(topicName, order, null, "1");
+ order.setName("6");
+ publish(topicName, order, null, "1");
+
+ {
+ order.setName("1");
+ consumeOne = consumeOrder(order, consumeOne);
+ order.setName("2");
+ consumeTwo = consumeOrder(order, consumeTwo);
+ order.setName("3");
+ consumeTwo = consumeOrder(order, consumeTwo);
+ order.setName("4");
+ consumeOne = consumeOrder(order, consumeOne);
+ order.setName("5");
+ consumeOne = consumeOrder(order, consumeOne);
+ order.setName("6");
+ consumeOne = consumeOrder(order, consumeOne);
+ }
+ }
+ }
+
+ private Link consumeOrder(Order order, Link consumeNext)
+ throws Exception
+ {
+ ClientResponse res = consumeNext.request().header("Accept-Wait",
"4").accept("application/xml").post(String.class);
+ Assert.assertEquals(200, res.getStatus());
+ Assert.assertEquals("application/xml",
res.getHeaders().getFirst("Content-Type").toString().toLowerCase());
+ Order order2 = (Order) res.getEntity(Order.class);
+ Assert.assertEquals(order, order2);
+ consumeNext =
MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), res,
"consume-next");
+ Assert.assertNotNull(consumeNext);
+ return consumeNext;
+ }
+}
\ No newline at end of file
Modified: branches/hornetq-416/pom.xml
===================================================================
--- branches/hornetq-416/pom.xml 2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/pom.xml 2010-11-01 10:17:23 UTC (rev 9826)
@@ -253,7 +253,7 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>2.1.2</version>
+ <version>2.1.6</version>
</dependency>
<!-- needed to compile the tests-->
<dependency>
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -549,15 +549,13 @@
for (int i = 0; i < data; i++)
{
char b = (char)bytes[i];
-
- if (b == '\n')
+
+ if (b < 33 || b > 136)
{
- str.append("\\n");
+ //Unreadable characters
+
+ str.append(bytes[i]);
}
- else if (b == 0)
- {
- str.append("NUL");
- }
else
{
str.append(b);
Modified:
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -39,6 +39,7 @@
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.UUIDGenerator;
/**
@@ -577,7 +578,13 @@
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- server.getSecurityManager().validateUser(login, passcode);
+ HornetQSecurityManager sm = server.getSecurityManager();
+
+ // The sm will be null case security is not enabled...
+ if (sm != null)
+ {
+ sm.validateUser(login, passcode);
+ }
connection.setLogin(login);
connection.setPasscode(passcode);
Modified:
branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -50,32 +50,42 @@
public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS;
- public static final String USER_NAME = "username";
- public static final String PASSWORD = "password";
+ public static final String CONSUMER_KEY = "consumerKey";
+ public static final String CONSUMER_SECRET = "consumerSecret";
+ public static final String ACCESS_TOKEN ="accessToken";
+ public static final String ACCESS_TOKEN_SECRET = "accessTokenSecret";
public static final String QUEUE_NAME = "queue";
public static final String INCOMING_INTERVAL = "interval";
static
{
ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(INCOMING_INTERVAL);
REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
- REQUIRED_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
- REQUIRED_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
}
}
Modified:
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -23,7 +23,7 @@
import org.hornetq.integration.twitter.TwitterConstants;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.*;
-
+import twitter4j.http.AccessToken;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -39,10 +39,14 @@
private final String connectorName;
- private final String userName;
+ private final String consumerKey;
- private final String password;
+ private final String consumerSecret;
+ private final String accessToken;
+
+ private final String accessTokenSecret;
+
private final String queueName;
private final int intervalSeconds;
@@ -59,17 +63,19 @@
private final ScheduledExecutorService scheduledPool;
- private ScheduledFuture scheduledFuture;
+ private ScheduledFuture<?> scheduledFuture;
- public IncomingTweetsHandler(final String connectorName,
+ public IncomingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
final StorageManager storageManager,
final PostOffice postOffice,
final ScheduledExecutorService scheduledThreadPool)
{
this.connectorName = connectorName;
- this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME,
null, configuration);
- this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD,
null, configuration);
+ this.consumerKey =
ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null,
configuration);
+ this.consumerSecret =
ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null,
configuration);
+ this.accessToken =
ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null,
configuration);
+ this.accessTokenSecret =
ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null,
configuration);
this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME,
null, configuration);
Integer intervalSeconds =
ConfigurationHelper.getIntProperty(TwitterConstants.INCOMING_INTERVAL, 0, configuration);
if (intervalSeconds > 0)
@@ -95,9 +101,12 @@
paging = new Paging();
TwitterFactory tf = new TwitterFactory();
- this.twitter = tf.getInstance(userName, password);
+ this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
+ this.consumerSecret,
+ new AccessToken(this.accessToken,
+
this.accessTokenSecret));
this.twitter.verifyCredentials();
-
+
// getting latest ID
this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
Modified:
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
===================================================================
---
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -21,6 +21,7 @@
import org.hornetq.integration.twitter.TwitterConstants;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.*;
+import twitter4j.http.AccessToken;
import java.util.Map;
@@ -34,10 +35,14 @@
private final String connectorName;
- private final String userName;
+ private final String consumerKey;
- private final String password;
+ private final String consumerSecret;
+ private final String accessToken;
+
+ private final String accessTokenSecret;
+
private final String queueName;
private final PostOffice postOffice;
@@ -50,13 +55,15 @@
private boolean isStarted = false;
- public OutgoingTweetsHandler(final String connectorName,
+ public OutgoingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
final PostOffice postOffice)
{
this.connectorName = connectorName;
- this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME,
null, configuration);
- this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD,
null, configuration);
+ this.consumerKey =
ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null,
configuration);
+ this.consumerSecret =
ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null,
configuration);
+ this.accessToken =
ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null,
configuration);
+ this.accessTokenSecret =
ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null,
configuration);
this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME,
null, configuration);
this.postOffice = postOffice;
}
@@ -91,8 +98,12 @@
this.queue = (Queue)b.getBindable();
TwitterFactory tf = new TwitterFactory();
- this.twitter = tf.getInstance(userName, password);
+ this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
+ this.consumerSecret,
+ new AccessToken(this.accessToken,
+
this.accessTokenSecret));
this.twitter.verifyCredentials();
+
// TODO make filter-string configurable
// this.filter = FilterImpl.createFilter(filterString);
this.filter = null;
Modified:
branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
===================================================================
---
branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java 2010-11-01
09:44:00 UTC (rev 9825)
+++
branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java 2010-11-01
10:17:23 UTC (rev 9826)
@@ -38,6 +38,7 @@
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
import twitter4j.*;
+import twitter4j.http.AccessToken;
/**
* A TwitterTest
@@ -50,19 +51,23 @@
{
private static final Logger log = Logger.getLogger(TwitterTest.class);
private static final String KEY_CONNECTOR_NAME = "connector.name";
- private static final String KEY_USERNAME = "username";
- private static final String KEY_PASSWORD = "password";
+ private static final String KEY_CONSUMER_KEY = "consumerKey";
+ private static final String KEY_CONSUMER_SECRET = "consumerSecret";
+ private static final String KEY_ACCESS_TOKEN = "accessToken";
+ private static final String KEY_ACCESS_TOKEN_SECRET = "accessTokenSecret";
private static final String KEY_QUEUE_NAME = "queue.name";
- private static final String TWITTER_USERNAME =
System.getProperty("twitter.username");
- private static final String TWITTER_PASSWORD =
System.getProperty("twitter.password");
-
+ private static final String TWITTER_CONSUMER_KEY =
System.getProperty("twitter.consumerKey");
+ private static final String TWITTER_CONSUMER_SECRET =
System.getProperty("twitter.consumerSecret");
+ private static final String TWITTER_ACCESS_TOKEN =
System.getProperty("twitter.accessToken");
+ private static final String TWITTER_ACCESS_TOKEN_SECRET =
System.getProperty("twitter.accessTokenSecret");
+
@Override
protected void setUp() throws Exception
{
- if(TWITTER_USERNAME == null || TWITTER_PASSWORD == null)
+ if(TWITTER_CONSUMER_KEY == null || TWITTER_CONSUMER_SECRET == null ||
TWITTER_ACCESS_TOKEN == null || TWITTER_ACCESS_TOKEN_SECRET == null)
{
- throw new Exception("* * * Please set twitter.username and
twitter.password in system property * * *");
+ throw new Exception("* * * Please set twitter.consumerKey,
twitter.consumerSecret, twitter.accessToken and twitter.accessTokenSecuret in system
property * * *");
}
super.setUp();
}
@@ -101,8 +106,10 @@
public void testIncomingWithInvalidCredentials() throws Exception
{
HashMap<String,String> params = new HashMap<String,String>();
- params.put(KEY_USERNAME, "invalidUsername");
- params.put(KEY_PASSWORD, "invalidPassword");
+ params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
+ params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
+ params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
+ params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
internalTestIncomingFailedToInitialize(params);
}
@@ -139,18 +146,14 @@
public void testOutgoingWithInvalidCredentials() throws Exception
{
HashMap<String,String> params = new HashMap<String,String>();
- params.put(KEY_USERNAME, "invalidUsername");
- params.put(KEY_PASSWORD, "invalidPassword");
+ params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
+ params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
+ params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
+ params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
internalTestOutgoingFailedToInitialize(params);
}
- /**
- * This will fail until TFJ-347 is fixed.
- *
http://twitter4j.org/jira/browse/TFJ-347
- *
- * @throws Exception
- */
- public void _testOutgoingWithInReplyTo() throws Exception
+ public void testOutgoingWithInReplyTo() throws Exception
{
internalTestOutgoingWithInReplyTo();
}
@@ -161,7 +164,10 @@
ClientSession session = null;
String queue = "TwitterTestQueue";
int interval = 5;
- Twitter twitter = new
TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ Twitter twitter = new
TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+
TWITTER_CONSUMER_SECRET,
+ new
AccessToken(TWITTER_ACCESS_TOKEN,
+
TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/incoming: " +
System.currentTimeMillis();
log.debug("test incoming: " + testMessage);
@@ -171,8 +177,10 @@
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.INCOMING_INTERVAL, interval);
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+ config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+ config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+ config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration inconf =
new ConnectorServiceConfiguration(
TwitterIncomingConnectorServiceFactory.class.getName(),
@@ -244,22 +252,32 @@
HornetQServer server0 = null;
String connectorName = "test-incoming-connector";
String queue = "TwitterTestQueue";
- String userName = "invalidUsername";
- String password = "invalidPassword";
+ String consumerKey = "invalidConsumerKey";
+ String consumerSecret = "invalidConsumerSecret";
+ String accessToken = "invalidAccessToken";
+ String accessTokenSecret = "invalidAccessTokenSecret";
int interval = 5;
if(params.containsKey(KEY_CONNECTOR_NAME))
{
connectorName = params.get(KEY_CONNECTOR_NAME);
}
- if(params.containsKey(KEY_USERNAME))
+ if(params.containsKey(KEY_CONSUMER_KEY))
{
- userName = params.get(KEY_USERNAME);
+ consumerKey = params.get(KEY_CONSUMER_KEY);
}
- if(params.containsKey(KEY_PASSWORD))
+ if(params.containsKey(KEY_CONSUMER_SECRET))
{
- password = params.get(KEY_PASSWORD);
+ consumerSecret = params.get(KEY_CONSUMER_SECRET);
}
+ if(params.containsKey(KEY_ACCESS_TOKEN))
+ {
+ accessToken = params.get(KEY_ACCESS_TOKEN);
+ }
+ if(params.containsKey(KEY_ACCESS_TOKEN_SECRET))
+ {
+ accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
+ }
if(params.containsKey(KEY_QUEUE_NAME))
{
queue = params.get(KEY_QUEUE_NAME);
@@ -271,8 +289,10 @@
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.INCOMING_INTERVAL, interval);
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, userName);
- config.put(TwitterConstants.PASSWORD, password);
+ config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
+ config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
+ config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
ConnectorServiceConfiguration inconf =
new
ConnectorServiceConfiguration(TwitterIncomingConnectorServiceFactory.class.getName(),
config,
@@ -306,7 +326,10 @@
HornetQServer server0 = null;
ClientSession session = null;
String queue = "TwitterTestQueue";
- Twitter twitter = new
TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ Twitter twitter = new
TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+
TWITTER_CONSUMER_SECRET,
+ new
AccessToken(TWITTER_ACCESS_TOKEN,
+
TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/outgoing: " +
System.currentTimeMillis();
log.debug("test outgoing: " + testMessage);
@@ -315,8 +338,10 @@
Configuration configuration = createDefaultConfig(false);
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+ config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+ config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+ config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration outconf =
new
ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
config,
@@ -388,25 +413,35 @@
protected void internalTestOutgoingFailedToInitialize(HashMap<String,String>
params) throws Exception
{
HornetQServer server0 = null;
- String connectorName = "test-outgoing-connector";
+ String connectorName = "test-outgoing-connector";
String queue = "TwitterTestQueue";
- String userName = TWITTER_USERNAME;
- String password = TWITTER_PASSWORD;
+ String consumerKey = TWITTER_CONSUMER_KEY;
+ String consumerSecret = TWITTER_CONSUMER_SECRET;
+ String accessToken = TWITTER_ACCESS_TOKEN;
+ String accessTokenSecret = TWITTER_ACCESS_TOKEN_SECRET;
if(params.containsKey(KEY_CONNECTOR_NAME))
{
connectorName = params.get(KEY_CONNECTOR_NAME);
}
- if(params.containsKey(KEY_USERNAME))
+ if (params.containsKey(KEY_CONSUMER_KEY))
{
- userName = params.get(KEY_USERNAME);
+ consumerKey = params.get(KEY_CONSUMER_KEY);
}
- if(params.containsKey(KEY_PASSWORD))
+ if (params.containsKey(KEY_CONSUMER_SECRET))
{
- password = params.get(KEY_PASSWORD);
+ consumerSecret = params.get(KEY_CONSUMER_SECRET);
}
- if(params.containsKey(KEY_QUEUE_NAME))
+ if (params.containsKey(KEY_ACCESS_TOKEN))
{
+ accessToken = params.get(KEY_ACCESS_TOKEN);
+ }
+ if (params.containsKey(KEY_ACCESS_TOKEN_SECRET))
+ {
+ accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
+ }
+ if (params.containsKey(KEY_QUEUE_NAME))
+ {
queue = params.get(KEY_QUEUE_NAME);
}
@@ -415,12 +450,14 @@
Configuration configuration = createDefaultConfig(false);
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, userName);
- config.put(TwitterConstants.PASSWORD, password);
+ config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
+ config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
+ config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
ConnectorServiceConfiguration outconf =
new
ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
config,
- "test-outgoing-connector");
+ connectorName);
configuration.getConnectorServiceConfigurations().add(outconf);
CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null,
false);
configuration.getQueueConfigurations().add(qc);
@@ -446,16 +483,21 @@
HornetQServer server0 = null;
ClientSession session = null;
String queue = "TwitterTestQueue";
- Twitter twitter = new
TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+ Twitter twitter = new
TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+
TWITTER_CONSUMER_SECRET,
+ new
AccessToken(TWITTER_ACCESS_TOKEN,
+
TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/outgoing with in_reply_to: " +
System.currentTimeMillis();
- String replyMessage = "@" + TWITTER_USERNAME + "
TwitterTest/outgoing reply: " + System.currentTimeMillis();
+ String replyMessage = "@" + twitter.getScreenName() + "
TwitterTest/outgoing reply: " + System.currentTimeMillis();
try
{
Configuration configuration = createDefaultConfig(false);
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
- config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
- config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+ config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+ config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+ config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+ config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration outconf =
new
ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
config,