[hornetq-commits] JBoss hornetq SVN: r9826 - in branches/hornetq-416: docs/user-manual/en and 16 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 1 06:17:24 EDT 2010


Author: gaohoward
Date: 2010-11-01 06:17:23 -0400 (Mon, 01 Nov 2010)
New Revision: 9826

Added:
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
Modified:
   branches/hornetq-416/build-hornetq.properties
   branches/hornetq-416/build-hornetq.xml
   branches/hornetq-416/docs/user-manual/en/configuration-index.xml
   branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml
   branches/hornetq-416/examples/common/build.xml
   branches/hornetq-416/examples/core/twitter-connector/build.xml
   branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml
   branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
   branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
   branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java
   branches/hornetq-416/pom.xml
   branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
   branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java
   branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
   branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
   branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
Log:
back merge from trunk


Modified: branches/hornetq-416/build-hornetq.properties
===================================================================
--- branches/hornetq-416/build-hornetq.properties	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/build-hornetq.properties	2010-11-01 10:17:23 UTC (rev 9826)
@@ -9,6 +9,7 @@
 javac.include.ant.runtime=false
 javac.include.java.runtime=true
 javac.fail.onerror=true
+javac.encoding=utf-8
 
 # JUnit properties
 junit.showoutput=true

Modified: branches/hornetq-416/build-hornetq.xml
===================================================================
--- branches/hornetq-416/build-hornetq.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/build-hornetq.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -449,6 +449,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${build.src.dir}"/>
@@ -493,6 +494,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${build.src.dir}"/>
@@ -531,6 +533,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${src.main.dir}"/>
@@ -551,6 +554,7 @@
 	             deprecation="${javac.deprecation}"
 	             includeAntRuntime="${javac.include.ant.runtime}"
 	             includeJavaRuntime="${javac.include.java.runtime}"
+                     encoding="${javac.encoding}"
 	             failonerror="${javac.fail.onerror}">
 	         <src>
 	            <pathelement path="${src.main.dir}"/>
@@ -571,6 +575,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${src.main.dir}"/>
@@ -591,6 +596,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${src.main.dir}"/>
@@ -611,6 +617,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${src.main.dir}"/>
@@ -632,6 +639,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${src.main.dir}"/>
@@ -652,6 +660,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${src.main.dir}"/>
@@ -672,6 +681,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${src.main.dir}"/>
@@ -692,6 +702,7 @@
              deprecation="${javac.deprecation}"
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
              failonerror="${javac.fail.onerror}">
          <src>
             <pathelement path="${build.src.dir}"/>
@@ -1586,6 +1597,7 @@
              includeAntRuntime="${javac.include.ant.runtime}"
              includeJavaRuntime="${javac.include.java.runtime}"
              failonerror="${javac.fail.onerror}"
+             encoding="${javac.encoding}"
              srcdir="${test.src.dir}"
              destdir="${test.classes.dir}">
          <classpath refid="test.compilation.classpath"/>
@@ -1604,6 +1616,7 @@
              includeAntRuntime="true"
              includeJavaRuntime="${javac.include.java.runtime}"
              failonerror="${javac.fail.onerror}"
+             encoding="${javac.encoding}"
              srcdir="${test.jms.src.dir}"
              destdir="${test.jms.classes.dir}">
          <classpath refid="jms.test.compilation.classpath"/>
@@ -1622,6 +1635,7 @@
              includeAntRuntime="true"
              includeJavaRuntime="${javac.include.java.runtime}"
              failonerror="${javac.fail.onerror}"
+             encoding="${javac.encoding}"
              srcdir="${test.joram.src.dir}"
              destdir="${test.joram.classes.dir}">
           <classpath refid="joram.test.compilation.classpath"/>
@@ -1696,8 +1710,10 @@
              timeout="${junit.timeout}">
          <sysproperty key="user.home" value="${user.home}"/>
          <sysproperty key="java.io.tmpdir" value="${java.io.tmpdir}"/>
-      	 <sysproperty key="twitter.username" value="${twitter.username}"/>
-      	 <sysproperty key="twitter.password" value="${twitter.password}"/>
+         <sysproperty key="twitter.consumerKey" value="${twitter.consumerKey}"/>
+         <sysproperty key="twitter.consumerSecret" value="${twitter.consumerSecret}"/>
+         <sysproperty key="twitter.accessToken" value="${twitter.accessToken}"/>
+         <sysproperty key="twitter.accessTokenSecret" value="${twitter.accessTokenSecret}"/>
       	 <jvmarg value="-Djava.library.path=native/bin"/>
          <jvmarg value="-Dmodule.output=./"/>
          <jvmarg value="-Djava.util.logging.config.file=src/config/trunk/non-clustered/logging.properties"/>

Modified: branches/hornetq-416/docs/user-manual/en/configuration-index.xml
===================================================================
--- branches/hornetq-416/docs/user-manual/en/configuration-index.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/docs/user-manual/en/configuration-index.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -1026,7 +1026,7 @@
                             <entry>generic</entry>
                         </row>
                         <row>
-                            <entry id="configuration.connection-factory.signature">
+                            <entry id="configuration.connection-factory.signature.xa">
                                <link linkend="using-jms.configure.factory.types">connection-factory.xa</link>
                             </entry>
                             <entry>Boolean</entry>

Modified: branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml
===================================================================
--- branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/docs/user-manual/en/embedding-hornetq.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -46,19 +46,8 @@
     two different helper classes for this depending on whether your using the
     HornetQ Core API or JMS.</para>
 
-<<<<<<< .mine
     <section>
       <title>Core API Only</title>
-=======
-config.setAcceptorConfigurations(transports);</programlisting>
-        <para>You need to instantiate and start HornetQ server. The class <literal
-                >org.hornetq.api.core.server.HornetQ</literal> has a few static methods for creating
-            servers with common configurations.</para>
-        <programlisting>import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-            
->>>>>>> .r9629
-
       <para>For instantiating a core HornetQ Server only, the steps are pretty
       simple. The example requires that you have defined a configuration file
       <literal>hornetq-configuration.xml</literal> in your
@@ -66,38 +55,9 @@
 
 ...
 
-<<<<<<< .mine
 EmbeddedHornetQ embedded = new EmbeddedHornetQ();
 embedded.start();
-=======
-HornetQServer server = HornetQServers.newHornetQServer(config);
->>>>>>> .r9629
 
-<<<<<<< .mine
-// Assuming you defined an "in-vm" acceptor within your hornetq-configuration.xml file
-=======
-server.start();</programlisting>
-        <para>You also have the option of instantiating <literal>HornetQServerImpl</literal>
-            directly:</para>
-        <programlisting>HornetQServer server = new HornetQServerImpl(config);
-server.start();</programlisting>
-    </section>
-    <section>
-        <title>Dependency Frameworks</title>
-        <para>You may also choose to use a dependency injection framework such as <trademark>JBoss
-                Micro Container</trademark> or <trademark>Spring Framework</trademark>.</para>
-        <para>HornetQ standalone uses JBoss Micro Container as the injection framework. <literal
-                >HornetQBootstrapServer</literal> and <literal>hornetq-beans.xml</literal> which are
-            part of the HornetQ distribution provide a very complete implementation of what's needed
-            to bootstrap the server using JBoss Micro Container. </para>
-        <para>When using JBoss Micro Container, you need to provide an XML file declaring the
-                <literal>HornetQServer</literal> and <literal>Configuration</literal> object, you
-            can also inject a security manager and a MBean server if you want, but those are
-            optional.</para>
-        <para>A very basic XML Bean declaration for the JBoss Micro Container would be:</para>
-        <programlisting>&lt;?xml version="1.0" encoding="UTF-8"?>
->>>>>>> .r9629
-
 ClientSessionFactory nettyFactory =  HornetQClient.createClientSessionFactory(
                                         new TransportConfiguration(
                                            InVMConnectorFactory.class.getName()));

Modified: branches/hornetq-416/examples/common/build.xml
===================================================================
--- branches/hornetq-416/examples/common/build.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/examples/common/build.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -140,6 +140,7 @@
       <!--<echo>client classpath = ${clientClasspath}</echo>-->
       <property file="${imported.basedir}/config/server.properties"/>
       <java classname="${example.classname}" fork="true" resultproperty="example-result">
+         <jvmarg line="${client.args}"/> 
          <jvmarg value="-Dhornetq.example.server.classpath=${serverclasspath}"/>
          <jvmarg value="-Dhornetq.example.server.args=${server.args}"/>
          <jvmarg value="-Dhornetq.example.logserveroutput=${hornetq.example.logserveroutput}"/>

Modified: branches/hornetq-416/examples/core/twitter-connector/build.xml
===================================================================
--- branches/hornetq-416/examples/core/twitter-connector/build.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/examples/core/twitter-connector/build.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -19,10 +19,13 @@
    <import file="../../common/build.xml"/>
    <property environment='env'/>
 
-   <target name="check" unless="env.TWITTER_USERNAME">
+   <target name="check" unless="env.TWITTER_CONSUMER_KEY">
       <echo>**************************************************************************</echo>
       <echo>* Please set the twitter account:                                        *</echo>
-      <echo>* ./build.sh -Denv.TWITTER_USERNAME=user -Denv.TWITTER_PASSWORD=password *</echo>
+      <echo>* ./build.sh -Denv.TWITTER_CONSUMER_KEY=consumerKey \                    *</echo>
+      <echo>*             -Denv.TWITTER_CONSUMER_SECRET=consumerSecret \             *</echo>
+      <echo>*             -Denv.TWITTER_ACCESS_TOKEN=accessToken \                   *</echo>
+      <echo>*             -Denv.TWITTER_ACCESS_TOKEN_SECRET=accessTokenSecret        *</echo>
       <echo>**************************************************************************</echo>
       <fail message="run example failed"/>
    </target>
@@ -30,10 +33,11 @@
    <target name="run" depends="check">
       <antcall target="runExample">
          <param name="example.classname" value="org.hornetq.core.example.TwitterConnectorExample"/>
+         <param name="client.args" value="-Dtwitter.example.alternativeMessage=${env.TWITTER_EXAMPLE_ALTERNATIVE_MESSAGE}"/>
 	 <!-- HTTP proxy settings
          <param name="server.args" value="-Dtwitter4j.http.proxyHost=your.proxy.server -Dtwitter4j.http.proxyPort=your.proxy.port"/>
 	 -->
-         <param name="server.args" value="-Dtwitter.username=${env.TWITTER_USERNAME} -Dtwitter.password=${env.TWITTER_PASSWORD}"/>
+         <param name="server.args" value="-Dtwitter.consumerKey=${env.TWITTER_CONSUMER_KEY} -Dtwitter.consumerSecret=${env.TWITTER_CONSUMER_SECRET} -Dtwitter.accessToken=${env.TWITTER_ACCESS_TOKEN} -Dtwitter.accessTokenSecret=${env.TWITTER_ACCESS_TOKEN_SECRET}"/>
       </antcall>
    </target>
 

Modified: branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml
===================================================================
--- branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/examples/core/twitter-connector/server0/hornetq-configuration.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -13,7 +13,7 @@
    <!-- Acceptors -->
    <acceptors>
       <acceptor name="netty-acceptor">
-         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>        
+         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
       </acceptor>
    </acceptors>
 
@@ -44,15 +44,19 @@
       <connector-service name="my-incoming-tweets">
          <factory-class>org.hornetq.integration.twitter.TwitterIncomingConnectorServiceFactory</factory-class>
          <param key="queue" value="queue.incomingQueue"/>
-         <param key="username" value="${twitter.username}"/>
-         <param key="password" value="${twitter.password}"/>
+         <param key="consumerKey" value="${twitter.consumerKey}"/>
+         <param key="consumerSecret" value="${twitter.consumerSecret}"/>
+         <param key="accessToken" value="${twitter.accessToken}"/>
+         <param key="accessTokenSecret" value="${twitter.accessTokenSecret}"/>
          <param key="interval" value="60"/>
       </connector-service>
        <connector-service name="my-outgoing-tweets">
          <factory-class>org.hornetq.integration.twitter.TwitterOutgoingConnectorServiceFactory</factory-class>
          <param key="queue" value="queue.outgoingQueue"/>
-         <param key="username" value="${twitter.username}"/>
-         <param key="password" value="${twitter.password}"/>
+         <param key="consumerKey" value="${twitter.consumerKey}"/>
+         <param key="consumerSecret" value="${twitter.consumerSecret}"/>
+         <param key="accessToken" value="${twitter.accessToken}"/>
+         <param key="accessTokenSecret" value="${twitter.accessTokenSecret}"/>
       </connector-service>
    </connector-services>
    

Modified: branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
===================================================================
--- branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -44,6 +44,11 @@
       ClientSession session = null;
       try
       {
+         String testMessage = System.currentTimeMillis() + ": " + System.getProperty("twitter.example.alternativeMessage");
+         if(testMessage == null || testMessage.trim().equals("")) {
+            testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen before ;-) ###";
+         }
+
          // Step 1. Create a ClientSessionFactory.
          csf = HornetQClient.createClientSessionFactory (new TransportConfiguration(NettyConnectorFactory.class.getName()));
 
@@ -58,7 +63,6 @@
 
          // Step 5. Create a core message.
          ClientMessage cm = session.createMessage(org.hornetq.api.core.Message.TEXT_TYPE,true);
-         String testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen before ;-) ###";
          cm.getBodyBuffer().writeString(testMessage);
 
          // Step 6. Send a message to queue.outgoingQueue.
@@ -76,13 +80,21 @@
          ClientMessage received = cc.receive(70 * 1000);
          received.acknowledge();
          String receivedText = received.getBodyBuffer().readString();
-         System.out.println("#### Received a message from " + INCOMING_QUEUE + ": " + receivedText);
 
-         if(!receivedText.equals(testMessage))
+         while(!receivedText.equals(testMessage))
          {
-            return false;
+            // ignoring other tweets
+            received = cc.receiveImmediate();
+            if(received == null) {
+               // no other tweets. test message has gone...
+               return false;
+            }
+            
+            received.acknowledge();
+            receivedText = received.getBodyBuffer().readString();
          }
-         
+
+         System.out.println("#### Received a message from " + INCOMING_QUEUE + ": " + receivedText);
          return true;
       }
       finally

Modified: branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml
===================================================================
--- branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/docbook/reference/en/master.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -870,6 +870,16 @@
           the server. Only usable on topics.</para>
         </listitem>
       </varlistentry>
+      <varlistentry>
+        <term>selector</term>
+
+        <listitem>
+          <para>This is an optional JMS selector string. The HornetQ REST
+          interface adds HTTP headers to the JMS message for REST produced
+          messages. HTTP headers are prefixed with "http_" and every '-'
+          charactor is converted to a '$'.</para>
+        </listitem>
+      </varlistentry>
     </variablelist>
 
     <sect1>
@@ -1480,6 +1490,10 @@
 
       <programlisting>&lt;push-registration&gt;
    &lt;durable&gt;false&lt;/durable&gt;
+   &lt;selector&gt;&lt;![CDATA[ 
+         SomeAttribute &gt; 1 
+       ]]&gt;
+   &lt;/selector&gt;
    &lt;link rel="push" href="http://somewhere.com" type="application/json" method="PUT"/&gt;
 &lt;/push-registration&gt;
 </programlisting>
@@ -1493,6 +1507,10 @@
       <literal>queue-push-store-dir</literal> config variable defined in
       Chapter 2. (<literal>topic-push-store-dir</literal> for topics).</para>
 
+      <para>The <literal>selector</literal> element is optional and defines a
+      JMS message selector. You should enclose it within CDATA blocks as some
+      of the selector characters are illegal XML.</para>
+
       <para>The <literal>link</literal> element specifies the basis of the
       interaction. The <literal>href</literal> attribute contains the URL you
       want to interact with. It is the only required attribute. The
@@ -1562,11 +1580,16 @@
       <title>The Topic Push Subscription XML</title>
 
       <para>The push XML for a topic is the same except the root element is
-      push-topic-registration. The rest of the document is the same. Here's an
+      push-topic-registration. (Also remember the <literal>selector</literal>
+      element is optional).  The rest of the document is the same. Here's an
       example of a template registration:</para>
 
       <programlisting>&lt;push-topic-registration&gt;
    &lt;durable&gt;true&lt;/durable&gt;
+   &lt;selector&gt;&lt;![CDATA[ 
+         SomeAttribute &gt; 1 
+       ]]&gt;
+   &lt;/selector&gt;
    &lt;link rel="template" href="http://somewhere.com/resources/{id}/messages" method="POST"/&gt;
 &lt;/push-topic registration&gt;</programlisting>
     </sect1>

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Hornetq.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -130,7 +130,7 @@
 
    public static <T> T getEntity(ClientMessage msg, Class<T> type, Type genericType, ResteasyProviderFactory factory)
    {
-      int size = msg.getBodyBuffer().readInt();
+      int size = msg.getBodySize();
       if (size <= 0) return null;
 
       byte[] body = new byte[size];

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/Jms.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -2,12 +2,18 @@
 
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.rest.util.HttpMessageHelper;
 import org.jboss.resteasy.spi.ResteasyProviderFactory;
 import org.jboss.resteasy.util.GenericType;
 
+import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.ObjectMessage;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.ext.MessageBodyReader;
+
+import java.io.ByteArrayInputStream;
 import java.lang.reflect.Type;
 
 /**
@@ -99,8 +105,12 @@
 
    public static boolean isHttpMessage(Message message)
    {
-      ClientMessage msg = ((HornetQMessage) message).getCoreMessage();
-      return Hornetq.isHttpMessage(msg);
+	   try {
+		   Boolean aBoolean = message.getBooleanProperty(HttpMessageHelper.POSTED_AS_HTTP_MESSAGE);
+		   return aBoolean != null && aBoolean.booleanValue() == true;
+	   } catch (JMSException e) {
+		   return false;
+	   }
    }
 
    /**
@@ -128,8 +138,33 @@
             throw new RuntimeException(e);
          }
       }
-      ClientMessage msg = ((HornetQMessage) message).getCoreMessage();
-      return Hornetq.getEntity(msg, type, genericType, factory);
+      BytesMessage bytesMessage = (BytesMessage)message;
+      
+      try
+      {
+    	  long size = bytesMessage.getBodyLength();
+    	  if (size <= 0) return null;
+
+    	  byte[] body = new byte[(int)size];
+    	  bytesMessage.readBytes(body);
+
+    	  String contentType = message.getStringProperty(HttpHeaderProperty.CONTENT_TYPE);
+    	  if (contentType == null)
+    	  {
+    		  throw new UnknownMediaType("Message did not have a Content-Type header cannot extract entity");
+    	  }
+    	  MediaType ct = MediaType.valueOf(contentType);
+    	  MessageBodyReader<T> reader = factory.getMessageBodyReader(type, genericType, null, ct);
+    	  if (reader == null)
+    	  {
+    		  throw new UnmarshalException("Unable to find a JAX-RS reader for type " + type.getName() + " and media type " + contentType);
+    	  }
+    	  return reader.readFrom(type, genericType, null, ct, null, new ByteArrayInputStream(body));
+      }
+      catch (Exception e)
+      {
+    	  throw new RuntimeException(e);
+      }
    }
 
 }

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/AcknowledgedQueueConsumer.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -31,10 +31,10 @@
    protected String startup = Long.toString(System.currentTimeMillis());
    protected volatile Acknowledgement ack;
 
-   public AcknowledgedQueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager)
+   public AcknowledgedQueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector)
            throws HornetQException
    {
-      super(factory, destination, id, serviceManager);
+      super(factory, destination, id, serviceManager, selector);
       autoAck = false;
    }
 
@@ -187,9 +187,7 @@
 
       try
       {
-         session = factory.createSession();
-         consumer = session.createConsumer(destination);
-         session.start();
+         createSession();
       }
       catch (Exception e)
       {

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumedHttpMessage.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -23,7 +23,7 @@
       buildHeaders(builder);
       if (data == null)
       {
-         int size = message.getBodyBuffer().readInt();
+         int size = message.getBodySize();
          if (size > 0)
          {
             data = new byte[size];

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/ConsumersResource.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -36,6 +36,9 @@
    protected int consumerTimeoutSeconds;
    protected DestinationServiceManager serviceManager;
 
+   protected static final int ACKNOWLEDGED = 0x01;
+   protected static final int SELECTOR_SET = 0x02;
+
    public DestinationServiceManager getServiceManager()
    {
       return serviceManager;
@@ -78,6 +81,7 @@
 
    private Object timeoutLock = new Object();
 
+   @Override
    public void testTimeout(String target)
    {
       synchronized (timeoutLock)
@@ -107,32 +111,41 @@
 
    @POST
    public Response createSubscription(@FormParam("autoAck") @DefaultValue("true") boolean autoAck,
+                                      @FormParam("selector") String selector,
                                       @Context UriInfo uriInfo)
    {
       try
       {
          QueueConsumer consumer = null;
+         int attributes = 0;
+         if (selector != null)
+         {
+            attributes = attributes | SELECTOR_SET;
+         }
+         
          if (autoAck)
          {
-            consumer = createConsumer();
+            consumer = createConsumer(selector);
          }
          else
          {
-            consumer = createAcknowledgedConsumer();
+            attributes |= ACKNOWLEDGED;
+            consumer = createAcknowledgedConsumer(selector);
          }
 
+         String attributesSegment = "attributes-" + attributes;
          UriBuilder location = uriInfo.getAbsolutePathBuilder();
-         if (autoAck) location.path("auto-ack");
-         else location.path("acknowledged");
+         location.path(attributesSegment);
          location.path(consumer.getId());
          Response.ResponseBuilder builder = Response.created(location.build());
+
          if (autoAck)
          {
-            QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/auto-ack/" + consumer.getId(), "-1");
+            QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
          }
          else
          {
-            AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId(), "-1");
+            AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/" + attributesSegment +"/" + consumer.getId(), "-1");
 
          }
          return builder.build();
@@ -147,11 +160,11 @@
       }
    }
 
-   public QueueConsumer createConsumer()
+   public QueueConsumer createConsumer(String selector)
            throws HornetQException
    {
       String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
-      QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId, serviceManager);
+      QueueConsumer consumer = new QueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
       synchronized (timeoutLock)
       {
          queueConsumers.put(genId, consumer);
@@ -160,11 +173,11 @@
       return consumer;
    }
 
-   public QueueConsumer createAcknowledgedConsumer()
+   public QueueConsumer createAcknowledgedConsumer(String selector)
            throws HornetQException
    {
       String genId = sessionCounter.getAndIncrement() + "-queue-" + destination + "-" + startup;
-      QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination, genId, serviceManager);
+      QueueConsumer consumer = new AcknowledgedQueueConsumer(sessionFactory, destination, genId, serviceManager, selector);
       synchronized (timeoutLock)
       {
          queueConsumers.put(genId, consumer);
@@ -173,85 +186,81 @@
       return consumer;
    }
 
-   @Path("auto-ack/{consumer-id}")
+   @Path("attributes-{attributes}/{consumer-id}")
    @GET
-   public Response getConsumer(@PathParam("consumer-id") String consumerId,
+   public Response getConsumer(@PathParam("attributes") int attributes,
+                               @PathParam("consumer-id") String consumerId,
                                @Context UriInfo uriInfo) throws Exception
    {
-      return headConsumer(consumerId, uriInfo);
+      return headConsumer(attributes, consumerId, uriInfo);
    }
 
-   @Path("auto-ack/{consumer-id}")
+   @Path("attributes-{attributes}/{consumer-id}")
    @HEAD
-   public Response headConsumer(@PathParam("consumer-id") String consumerId,
+   public Response headConsumer(@PathParam("attributes") int attributes,
+                                @PathParam("consumer-id") String consumerId,
                                 @Context UriInfo uriInfo) throws Exception
    {
-      QueueConsumer consumer = findConsumer(consumerId);
+      QueueConsumer consumer = findConsumer(attributes, consumerId, uriInfo);
       Response.ResponseBuilder builder = Response.noContent();
       // we synchronize just in case a failed request is still processing
       synchronized (consumer)
       {
-         QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+         if ( (attributes & ACKNOWLEDGED) > 0)
+         {
+            AcknowledgedQueueConsumer ackedConsumer = (AcknowledgedQueueConsumer)consumer;
+            Acknowledgement ack = ackedConsumer.getAck();
+            if (ack == null || ack.wasSet())
+            {
+               AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+            }
+            else
+            {
+               ackedConsumer.setAcknowledgementLink(builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.getId());
+            }
+
+         }
+         else
+         {
+            QueueConsumer.setConsumeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/attributes-" + attributes + "/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
+         }
       }
       return builder.build();
    }
 
-   @Path("auto-ack/{consumer-id}")
+   @Path("attributes-{attributes}/{consumer-id}")
    public QueueConsumer findConsumer(
-           @PathParam("consumer-id") String consumerId) throws Exception
+           @PathParam("attributes") int attributes,
+           @PathParam("consumer-id") String consumerId,
+           @Context UriInfo uriInfo) throws Exception
    {
       QueueConsumer consumer = queueConsumers.get(consumerId);
       if (consumer == null)
       {
-         QueueConsumer tmp = new QueueConsumer(sessionFactory, destination, consumerId, serviceManager);
-         consumer = addConsumerToMap(consumerId, tmp);
-      }
-      return consumer;
-   }
+         if ( (attributes & SELECTOR_SET) > 0)
+         {
 
-   @Path("acknowledged/{consumer-id}")
-   @GET
-   public Response getAcknowledgedConsumer(@PathParam("consumer-id") String consumerId,
-                                           @Context UriInfo uriInfo) throws Exception
-   {
-      return headAcknowledgedConsumer(consumerId, uriInfo);
-   }
+            Response.ResponseBuilder builder = Response.status(Response.Status.GONE)
+                    .entity("Cannot reconnect to selector-based consumer.  You must recreate the consumer session.")
+                    .type("text/plain");
+            UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
+            uriBuilder.path(uriInfo.getMatchedURIs().get(1));
+            serviceManager.getLinkStrategy().setLinkHeader(builder, "pull-consumers", "pull-consumers", uriBuilder.build().toString(), null);
+            throw new WebApplicationException(builder.build());
+            
+         }
+         if ( (attributes & ACKNOWLEDGED) > 0)
+         {
+            QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
+            consumer = addConsumerToMap(consumerId, tmp);
 
-   @Path("acknowledged/{consumer-id}")
-   @HEAD
-   public Response headAcknowledgedConsumer(@PathParam("consumer-id") String consumerId,
-                                            @Context UriInfo uriInfo) throws Exception
-   {
-      AcknowledgedQueueConsumer consumer = (AcknowledgedQueueConsumer) findAcknowledgedConsumer(consumerId);
-      Response.ResponseBuilder builder = Response.ok();
-      // we synchronize just in case a failed request is still processing
-      synchronized (consumer)
-      {
-         Acknowledgement ack = consumer.getAck();
-         if (ack == null || ack.wasSet())
-         {
-            AcknowledgedQueueConsumer.setAcknowledgeNextLink(serviceManager.getLinkStrategy(), builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId(), Long.toString(consumer.getConsumeIndex()));
          }
          else
          {
-            consumer.setAcknowledgementLink(builder, uriInfo, uriInfo.getMatchedURIs().get(1) + "/acknowledged/" + consumer.getId());
+            QueueConsumer tmp = new QueueConsumer(sessionFactory, destination, consumerId, serviceManager, null);
+            consumer = addConsumerToMap(consumerId, tmp);
          }
       }
-      return builder.build();
-   }
-
-
-   @Path("acknowledged/{consumer-id}")
-   public QueueConsumer findAcknowledgedConsumer(
-           @PathParam("consumer-id") String consumerId) throws Exception
-   {
-      QueueConsumer consumer = queueConsumers.get(consumerId);
-      if (consumer == null)
-      {
-         QueueConsumer tmp = new AcknowledgedQueueConsumer(sessionFactory, destination, consumerId, serviceManager);
-         ;
-         consumer = addConsumerToMap(consumerId, tmp);
-      }
       return consumer;
    }
 
@@ -275,16 +284,8 @@
    }
 
 
-   @Path("acknowledged/{consumer-id}")
+   @Path("attributes-{attributes}/{consumer-id}")
    @DELETE
-   public void closeAcknowledgedSession(
-           @PathParam("consumer-id") String consumerId)
-   {
-      closeSession(consumerId);
-   }
-
-   @Path("auto-ack/{consumer-id}")
-   @DELETE
    public void closeSession(
            @PathParam("consumer-id") String consumerId)
    {

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/PostMessage.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -6,6 +6,7 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.rest.util.HttpMessageHelper;
+import org.hornetq.api.core.Message;
 
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -218,7 +219,7 @@
 
    protected ClientMessage createHornetQMessage(HttpHeaders headers, byte[] body, boolean durable, ClientSession session) throws Exception
    {
-      ClientMessage message = session.createMessage(durable);
+      ClientMessage message = session.createMessage(Message.BYTES_TYPE, durable);
       HttpMessageHelper.writeHttpMessage(headers, body, message);
       return message;
    }

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/QueueConsumer.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -5,6 +5,7 @@
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.jms.client.SelectorTranslator;
 import org.hornetq.rest.util.Constants;
 import org.hornetq.rest.util.LinkStrategy;
 
@@ -37,6 +38,7 @@
    protected long lastPing = System.currentTimeMillis();
    protected DestinationServiceManager serviceManager;
    protected boolean autoAck = true;
+   protected String selector;
 
    /**
     * token used to create consume-next links
@@ -70,14 +72,15 @@
       lastPing = System.currentTimeMillis();
    }
 
-   public QueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager) throws HornetQException
+   public QueueConsumer(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector) throws HornetQException
    {
       this.factory = factory;
       this.destination = destination;
       this.id = id;
       this.serviceManager = serviceManager;
+      this.selector = selector;
 
-      createSession(factory, destination);
+      createSession();
    }
 
    public String getId()
@@ -191,11 +194,18 @@
       }
    }
 
-   protected void createSession(ClientSessionFactory factory, String destination)
+   protected void createSession()
            throws HornetQException
    {
       session = factory.createSession(true, true);
-      consumer = session.createConsumer(destination);
+      if (selector == null)
+      {
+         consumer = session.createConsumer(destination);
+      }
+      else
+      {
+         consumer = session.createConsumer(destination, SelectorTranslator.convertToHornetQFilterString(selector));
+      }
       session.start();
    }
 

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/PushConsumer.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -7,6 +7,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.jms.client.SelectorTranslator;
 import org.hornetq.rest.queue.push.xml.PushRegistration;
 
 /**
@@ -68,7 +69,14 @@
       strategy.start();
 
       session = factory.createSession(false, false);
-      consumer = session.createConsumer(destination);
+      if (registration.getSelector() != null)
+      {
+         consumer = session.createConsumer(destination, SelectorTranslator.convertToHornetQFilterString(registration.getSelector()));
+      }
+      else
+      {
+         consumer = session.createConsumer(destination);
+      }
       consumer.setMessageHandler(this);
       session.start();
       log.info("Push consumer started for: " + registration.getTarget());
@@ -100,6 +108,7 @@
       }
    }
 
+   @Override
    public void onMessage(ClientMessage clientMessage)
    {
       if (strategy.push(clientMessage) == false)

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/queue/push/xml/PushRegistration.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -18,7 +18,7 @@
  */
 @XmlRootElement(name = "push-registration")
 @XmlAccessorType(XmlAccessType.PROPERTY)
- at XmlType(propOrder = {"destination", "durable", "target", "authenticationMechanism", "headers"})
+ at XmlType(propOrder = {"destination", "durable", "selector", "target", "authenticationMechanism", "headers"})
 public class PushRegistration implements Serializable
 {
    private String id;
@@ -28,6 +28,7 @@
    private List<XmlHttpHeader> headers = new ArrayList<XmlHttpHeader>();
    private String destination;
    private Object loadedFrom;
+   private String selector;
 
    @XmlTransient
    public Object getLoadedFrom()
@@ -73,6 +74,16 @@
       this.durable = durable;
    }
 
+   public String getSelector()
+   {
+      return selector;
+   }
+
+   public void setSelector(String selector)
+   {
+      this.selector = selector;
+   }
+
    @XmlElementRef
    public XmlLink getTarget()
    {

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/AcknowledgedSubscriptionResource.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -13,10 +13,10 @@
 {
    private boolean durable;
 
-   public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager)
+   public AcknowledgedSubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector)
            throws HornetQException
    {
-      super(factory, destination, id, serviceManager);
+      super(factory, destination, id, serviceManager, selector);
    }
 
    public boolean isDurable()

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/FileTopicPushStore.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -18,6 +18,7 @@
       super(dirname);
    }
 
+   @Override
    public synchronized List<PushTopicRegistration> getByTopic(String topic)
    {
       List<PushTopicRegistration> list = new ArrayList<PushTopicRegistration>();

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionResource.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -13,10 +13,10 @@
 {
    boolean durable;
 
-   public SubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager)
+   public SubscriptionResource(ClientSessionFactory factory, String destination, String id, DestinationServiceManager serviceManager, String selector)
            throws HornetQException
    {
-      super(factory, destination, id, serviceManager);
+      super(factory, destination, id, serviceManager, selector);
    }
 
    public boolean isDurable()

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/topic/SubscriptionsResource.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -84,6 +84,7 @@
 
    private Object timeoutLock = new Object();
 
+   @Override
    public void testTimeout(String target)
    {
       synchronized (timeoutLock)
@@ -127,6 +128,7 @@
    public Response createSubscription(@FormParam("durable") @DefaultValue("false") boolean durable,
                                       @FormParam("autoAck") @DefaultValue("true") boolean autoAck,
                                       @FormParam("name") String subscriptionName,
+                                      @FormParam("selector") String selector,
                                       @Context UriInfo uriInfo)
    {
       if (subscriptionName != null)
@@ -185,7 +187,7 @@
                session.createTemporaryQueue(destination, subscriptionName);
             }
          }
-         QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName);
+         QueueConsumer consumer = createConsumer(durable, autoAck, subscriptionName, selector);
          queueConsumers.put(consumer.getId(), consumer);
          serviceManager.getTimeoutTask().add(this, consumer.getId());
 
@@ -225,19 +227,19 @@
       }
    }
 
-   protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String subscriptionName)
+   protected QueueConsumer createConsumer(boolean durable, boolean autoAck, String subscriptionName, String selector)
            throws HornetQException
    {
       QueueConsumer consumer;
       if (autoAck)
       {
-         SubscriptionResource subscription = new SubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager);
+         SubscriptionResource subscription = new SubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager, selector);
          subscription.setDurable(durable);
          consumer = subscription;
       }
       else
       {
-         AcknowledgedSubscriptionResource subscription = new AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager);
+         AcknowledgedSubscriptionResource subscription = new AcknowledgedSubscriptionResource(sessionFactory, subscriptionName, subscriptionName, serviceManager, selector);
          subscription.setDurable(durable);
          consumer = subscription;
       }
@@ -375,7 +377,7 @@
             QueueConsumer tmp = null;
             try
             {
-               tmp = createConsumer(true, autoAck, subscriptionId);
+               tmp = createConsumer(true, autoAck, subscriptionId, null);
             }
             catch (HornetQException e)
             {

Modified: branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/main/java/org/hornetq/rest/util/HttpMessageHelper.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -35,7 +35,7 @@
          if (headerName == null) continue;
          builder.header(headerName, message.getStringProperty(k));
       }
-      int size = message.getBodyBuffer().readInt();
+      int size = message.getBodySize();
       if (size > 0)
       {
          byte[] body = new byte[size];
@@ -78,20 +78,23 @@
             contentType = value;
          }
       }
-      int size = message.getBodyBuffer().readInt();
+      int size = message.getBodySize();
       if (size > 0)
       {
-         byte[] body = new byte[size];
-         message.getBodyBuffer().readBytes(body);
          Boolean aBoolean = message.getBooleanProperty(POSTED_AS_HTTP_MESSAGE);
          if (aBoolean != null && aBoolean.booleanValue())
          {
+            byte[] body = new byte[size];
+            message.getBodyBuffer().readBytes(body);
             //System.out.println("Building Message from HTTP message");
             request.body(contentType, body);
          }
          else
          {
             // assume posted as a JMS or HornetQ object message
+            size = message.getBodyBuffer().readInt();
+            byte[] body = new byte[size];
+            message.getBodyBuffer().readBytes(body);
             ByteArrayInputStream bais = new ByteArrayInputStream(body);
             Object obj = null;
             try
@@ -123,7 +126,6 @@
          }
       }
       message.putBooleanProperty(POSTED_AS_HTTP_MESSAGE, true);
-      message.getBodyBuffer().writeInt(body.length);
       message.getBodyBuffer().writeBytes(body);
    }
 

Added: branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java
===================================================================
--- branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java	                        (rev 0)
+++ branches/hornetq-416/hornetq-rest/hornetq-rest/src/test/java/org/hornetq/rest/test/SelectorTest.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -0,0 +1,304 @@
+package org.hornetq.rest.test;
+
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.rest.HttpHeaderProperty;
+import org.hornetq.rest.queue.push.xml.XmlLink;
+import org.hornetq.rest.topic.PushTopicRegistration;
+import org.hornetq.rest.topic.TopicDeployment;
+import org.jboss.resteasy.client.ClientRequest;
+import org.jboss.resteasy.client.ClientResponse;
+import org.jboss.resteasy.spi.Link;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.jboss.resteasy.test.TestPortProvider.*;
+
+/**
+ * @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>
+ * @version $Revision: 1 $
+ */
+public class SelectorTest extends MessageTestBase
+{
+   public static ConnectionFactory connectionFactory;
+   public static String topicName = HornetQDestination.createQueueAddressFromName("testTopic").toString();
+
+   @BeforeClass
+   public static void setup() throws Exception
+   {
+      connectionFactory = new HornetQJMSConnectionFactory(manager.getQueueManager().getSessionFactory());
+      System.out.println("Queue name: " + topicName);
+      TopicDeployment deployment = new TopicDeployment();
+      deployment.setDuplicatesAllowed(true);
+      deployment.setDurableSend(false);
+      deployment.setName(topicName);
+      manager.getTopicManager().deploy(deployment);
+   }
+
+   @XmlRootElement
+   public static class Order implements Serializable
+   {
+      private String name;
+      private String amount;
+
+      public String getName()
+      {
+         return name;
+      }
+
+      public void setName(String name)
+      {
+         this.name = name;
+      }
+
+      public String getAmount()
+      {
+         return amount;
+      }
+
+      public void setAmount(String amount)
+      {
+         this.amount = amount;
+      }
+
+      @Override
+      public boolean equals(Object o)
+      {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+
+         Order order = (Order) o;
+
+         if (!amount.equals(order.amount)) return false;
+         if (!name.equals(order.name)) return false;
+
+         return true;
+      }
+
+      @Override
+      public String toString()
+      {
+         return "Order{" +
+                 "name='" + name + '\'' +
+                 '}';
+      }
+
+      @Override
+      public int hashCode()
+      {
+         int result = name.hashCode();
+         result = 31 * result + amount.hashCode();
+         return result;
+      }
+   }
+
+   public static Destination createDestination(String dest)
+   {
+      HornetQDestination destination = (HornetQDestination) HornetQDestination.fromAddress(dest);
+      System.out.println("SimpleAddress: " + destination.getSimpleAddress());
+      return destination;
+   }
+
+   public static void publish(String dest, Serializable object, String contentType, String tag) throws Exception
+   {
+      Connection conn = connectionFactory.createConnection();
+      try
+      {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = createDestination(dest);
+         MessageProducer producer = session.createProducer(destination);
+         ObjectMessage message = session.createObjectMessage();
+
+         if (contentType != null)
+         {
+            message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, contentType);
+         }
+         if (tag != null)
+         {
+            message.setStringProperty("MyTag", tag);
+         }
+         message.setObject(object);
+
+         producer.send(message);
+      }
+      finally
+      {
+         conn.close();
+      }
+   }
+
+   @Path("/push")
+   public static class PushReceiver
+   {
+      public static Order oneOrder;
+      public static Order twoOrder;
+
+      @POST
+      @Path("one")
+      public void one(Order order)
+      {
+         oneOrder = order;
+      }
+
+      @POST
+      @Path("two")
+      public void two(Order order)
+      {
+         twoOrder = order;   
+      }
+
+
+   }
+
+   @Test
+   public void testPush() throws Exception
+   {
+      server.getJaxrsServer().getDeployment().getRegistry().addPerRequestResource(PushReceiver.class);
+      ClientRequest request = new ClientRequest(generateURL("/topics/" + topicName));
+
+      ClientResponse response = request.head();
+      Assert.assertEquals(200, response.getStatus());
+      Link consumers = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "push-subscriptions");
+      System.out.println("push: " + consumers);
+
+      PushTopicRegistration oneReg = new PushTopicRegistration();
+      oneReg.setDurable(false);
+      XmlLink target = new XmlLink();
+      target.setMethod("post");
+      target.setHref(generateURL("/push/one"));
+      target.setType("application/xml");
+      oneReg.setTarget(target);
+      oneReg.setSelector("MyTag = '1'");
+      response = consumers.request().body("application/xml", oneReg).post();
+      Link oneSubscription = response.getLocation();
+
+      PushTopicRegistration twoReg = new PushTopicRegistration();
+      twoReg.setDurable(false);
+      target = new XmlLink();
+      target.setMethod("post");
+      target.setHref(generateURL("/push/two"));
+      target.setType("application/xml");
+      twoReg.setTarget(target);
+      twoReg.setSelector("MyTag = '2'");
+      response = consumers.request().body("application/xml", twoReg).post();
+      Link twoSubscription = response.getLocation();
+
+      Order order = new Order();
+      order.setName("1");
+      order.setAmount("$5.00");
+      publish(topicName, order, null, "1");
+      Thread.sleep(200);
+      Assert.assertEquals(order, PushReceiver.oneOrder);
+
+      order.setName("2");
+      publish(topicName, order, null, "2");
+      Thread.sleep(200);
+      Assert.assertEquals(order, PushReceiver.twoOrder);
+
+      order.setName("3");
+      publish(topicName, order, null, "2");
+      Thread.sleep(200);
+      Assert.assertEquals(order, PushReceiver.twoOrder);
+
+      order.setName("4");
+      publish(topicName, order, null, "1");
+      Thread.sleep(200);
+      Assert.assertEquals(order, PushReceiver.oneOrder);
+
+      order.setName("5");
+      publish(topicName, order, null, "1");
+      Thread.sleep(200);
+      Assert.assertEquals(order, PushReceiver.oneOrder);
+
+      order.setName("6");
+      publish(topicName, order, null, "1");
+      Thread.sleep(200);
+      Assert.assertEquals(order, PushReceiver.oneOrder);
+
+      oneSubscription.request().delete();
+      twoSubscription.request().delete();
+
+
+   }
+
+
+   @Test
+   public void testPull() throws Exception
+   {
+      ClientRequest request = new ClientRequest(generateURL("/topics/" + topicName));
+
+      ClientResponse response = request.head();
+      Assert.assertEquals(200, response.getStatus());
+      Link consumers = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "pull-subscriptions");
+      System.out.println("pull: " + consumers);
+      response = consumers.request().formParameter("autoAck", "true")
+              .formParameter("selector", "MyTag = '1'").post();
+
+      Link consumeOne = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "consume-next");
+      System.out.println("consumeOne: " + consumeOne);
+      response = consumers.request().formParameter("autoAck", "true")
+              .formParameter("selector", "MyTag = '2'").post();
+      Link consumeTwo = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), response, "consume-next");
+      System.out.println("consumeTwo: " + consumeTwo);
+
+
+      // test that Accept header is used to set content-type
+      {
+         Order order = new Order();
+         order.setName("1");
+         order.setAmount("$5.00");
+         publish(topicName, order, null, "1");
+         order.setName("2");
+         publish(topicName, order, null, "2");
+         order.setName("3");
+         publish(topicName, order, null, "2");
+         order.setName("4");
+         publish(topicName, order, null, "1");
+         order.setName("5");
+         publish(topicName, order, null, "1");
+         order.setName("6");
+         publish(topicName, order, null, "1");
+
+         {
+            order.setName("1");
+            consumeOne = consumeOrder(order, consumeOne);
+            order.setName("2");
+            consumeTwo = consumeOrder(order, consumeTwo);
+            order.setName("3");
+            consumeTwo = consumeOrder(order, consumeTwo);
+            order.setName("4");
+            consumeOne = consumeOrder(order, consumeOne);
+            order.setName("5");
+            consumeOne = consumeOrder(order, consumeOne);
+            order.setName("6");
+            consumeOne = consumeOrder(order, consumeOne);
+         }
+      }
+   }
+
+   private Link consumeOrder(Order order, Link consumeNext)
+           throws Exception
+   {
+      ClientResponse res = consumeNext.request().header("Accept-Wait", "4").accept("application/xml").post(String.class);
+      Assert.assertEquals(200, res.getStatus());
+      Assert.assertEquals("application/xml", res.getHeaders().getFirst("Content-Type").toString().toLowerCase());
+      Order order2 = (Order) res.getEntity(Order.class);
+      Assert.assertEquals(order, order2);
+      consumeNext = MessageTestBase.getLinkByTitle(manager.getQueueManager().getLinkStrategy(), res, "consume-next");
+      Assert.assertNotNull(consumeNext);
+      return consumeNext;
+   }
+}
\ No newline at end of file

Modified: branches/hornetq-416/pom.xml
===================================================================
--- branches/hornetq-416/pom.xml	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/pom.xml	2010-11-01 10:17:23 UTC (rev 9826)
@@ -253,7 +253,7 @@
       <dependency>
          <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-core</artifactId>
-           <version>2.1.2</version>
+           <version>2.1.6</version>
        </dependency>
       <!-- needed to compile the tests-->
       <dependency>

Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -549,15 +549,13 @@
       for (int i = 0; i < data; i++)
       {
          char b = (char)bytes[i];
-
-         if (b == '\n')
+         
+         if (b < 33 || b > 136)
          {
-            str.append("\\n");
+            //Unreadable characters
+            
+            str.append(bytes[i]);
          }
-         else if (b == 0)
-         {
-            str.append("NUL");
-         }
          else
          {
             str.append(b);

Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -39,6 +39,7 @@
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.utils.UUIDGenerator;
 
 /**
@@ -577,7 +578,13 @@
       String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
 
-      server.getSecurityManager().validateUser(login, passcode);
+      HornetQSecurityManager sm = server.getSecurityManager();
+      
+      // The sm will be null case security is not enabled...
+      if (sm != null)
+      {
+         sm.validateUser(login, passcode);
+      }
 
       connection.setLogin(login);
       connection.setPasscode(passcode);

Modified: branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/src/main/org/hornetq/integration/twitter/TwitterConstants.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -50,32 +50,42 @@
    public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
    public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS;
 
-   public static final String USER_NAME =  "username";
-   public static final String PASSWORD = "password";
+   public static final String CONSUMER_KEY = "consumerKey";
+   public static final String CONSUMER_SECRET = "consumerSecret";
+   public static final String ACCESS_TOKEN ="accessToken";
+   public static final String ACCESS_TOKEN_SECRET = "accessTokenSecret";
    public static final String QUEUE_NAME =  "queue";
    public static final String INCOMING_INTERVAL = "interval";
 
    static
    {
       ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
-      ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
-      ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+      ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+      ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+      ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+      ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
       ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
       ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(INCOMING_INTERVAL);
 
       REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
-      REQUIRED_INCOMING_CONNECTOR_KEYS.add(USER_NAME);
-      REQUIRED_INCOMING_CONNECTOR_KEYS.add(PASSWORD);
+      REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+      REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+      REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+      REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
       REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
 
       ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
-      ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
-      ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+      ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY); 
+      ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+      ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+      ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
       ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
 
       REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
-      REQUIRED_OUTGOING_CONNECTOR_KEYS.add(USER_NAME);
-      REQUIRED_OUTGOING_CONNECTOR_KEYS.add(PASSWORD);
+      REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY);
+      REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
+      REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
+      REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
       REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
    }
 }

Modified: branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/IncomingTweetsHandler.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -23,7 +23,7 @@
 import org.hornetq.integration.twitter.TwitterConstants;
 import org.hornetq.utils.ConfigurationHelper;
 import twitter4j.*;
-
+import twitter4j.http.AccessToken;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -39,10 +39,14 @@
 
    private final String connectorName;
 
-   private final String userName;
+   private final String consumerKey;
 
-   private final String password;
+   private final String consumerSecret;
 
+   private final String accessToken;
+
+   private final String accessTokenSecret;
+
    private final String queueName;
 
    private final int intervalSeconds;
@@ -59,17 +63,19 @@
 
    private final ScheduledExecutorService scheduledPool;
 
-   private ScheduledFuture scheduledFuture;
+   private ScheduledFuture<?> scheduledFuture;
 
-   public IncomingTweetsHandler(final String connectorName, 
+   public IncomingTweetsHandler(final String connectorName,
                                 final Map<String, Object> configuration,
                                 final StorageManager storageManager,
                                 final PostOffice postOffice,
                                 final ScheduledExecutorService scheduledThreadPool)
    {
       this.connectorName = connectorName;
-      this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME, null, configuration);
-      this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD, null, configuration);
+      this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration);
+      this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration);
+      this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration);
+      this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration);
       this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
       Integer intervalSeconds = ConfigurationHelper.getIntProperty(TwitterConstants.INCOMING_INTERVAL, 0, configuration);
       if (intervalSeconds > 0)
@@ -95,9 +101,12 @@
 
       paging = new Paging();
       TwitterFactory tf = new TwitterFactory();
-      this.twitter = tf.getInstance(userName, password);
+      this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
+                                                   this.consumerSecret,
+                                                   new AccessToken(this.accessToken,
+                                                                   this.accessTokenSecret));
       this.twitter.verifyCredentials();
-
+      
       // getting latest ID
       this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
       ResponseList<Status> res = this.twitter.getHomeTimeline(paging);

Modified: branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -21,6 +21,7 @@
 import org.hornetq.integration.twitter.TwitterConstants;
 import org.hornetq.utils.ConfigurationHelper;
 import twitter4j.*;
+import twitter4j.http.AccessToken;
 
 import java.util.Map;
 
@@ -34,10 +35,14 @@
 
    private final String connectorName;
 
-   private final String userName;
+   private final String consumerKey;
 
-   private final String password;
+   private final String consumerSecret;
 
+   private final String accessToken;
+
+   private final String accessTokenSecret;
+
    private final String queueName;
 
    private final PostOffice postOffice;
@@ -50,13 +55,15 @@
 
    private boolean isStarted = false;
 
-   public OutgoingTweetsHandler(final String connectorName, 
+   public OutgoingTweetsHandler(final String connectorName,
                                 final Map<String, Object> configuration,
                                 final PostOffice postOffice)
    {
       this.connectorName = connectorName;
-      this.userName = ConfigurationHelper.getStringProperty(TwitterConstants.USER_NAME, null, configuration);
-      this.password = ConfigurationHelper.getStringProperty(TwitterConstants.PASSWORD, null, configuration);
+      this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration);
+      this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration);
+      this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration);
+      this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration);
       this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
       this.postOffice = postOffice;
    }
@@ -91,8 +98,12 @@
       this.queue = (Queue)b.getBindable();
 
       TwitterFactory tf = new TwitterFactory();
-      this.twitter = tf.getInstance(userName, password);
+      this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
+                                                   this.consumerSecret,
+                                                   new AccessToken(this.accessToken,
+                                                                   this.accessTokenSecret));
       this.twitter.verifyCredentials();
+      
       // TODO make filter-string configurable
       // this.filter = FilterImpl.createFilter(filterString);
       this.filter = null;

Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java	2010-11-01 09:44:00 UTC (rev 9825)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java	2010-11-01 10:17:23 UTC (rev 9826)
@@ -38,6 +38,7 @@
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
 import twitter4j.*;
+import twitter4j.http.AccessToken;
 
 /**
  * A TwitterTest
@@ -50,19 +51,23 @@
 {
    private static final Logger log = Logger.getLogger(TwitterTest.class);
    private static final String KEY_CONNECTOR_NAME = "connector.name";
-   private static final String KEY_USERNAME = "username";
-   private static final String KEY_PASSWORD = "password";
+   private static final String KEY_CONSUMER_KEY = "consumerKey";
+   private static final String KEY_CONSUMER_SECRET = "consumerSecret";
+   private static final String KEY_ACCESS_TOKEN = "accessToken";
+   private static final String KEY_ACCESS_TOKEN_SECRET = "accessTokenSecret";
    private static final String KEY_QUEUE_NAME = "queue.name";
    
-   private static final String TWITTER_USERNAME = System.getProperty("twitter.username");
-   private static final String TWITTER_PASSWORD = System.getProperty("twitter.password");
-   
+   private static final String TWITTER_CONSUMER_KEY = System.getProperty("twitter.consumerKey");
+   private static final String TWITTER_CONSUMER_SECRET = System.getProperty("twitter.consumerSecret");
+   private static final String TWITTER_ACCESS_TOKEN = System.getProperty("twitter.accessToken");
+   private static final String TWITTER_ACCESS_TOKEN_SECRET = System.getProperty("twitter.accessTokenSecret");
+
    @Override
    protected void setUp() throws Exception
    {
-      if(TWITTER_USERNAME == null || TWITTER_PASSWORD == null)
+      if(TWITTER_CONSUMER_KEY == null || TWITTER_CONSUMER_SECRET == null || TWITTER_ACCESS_TOKEN == null || TWITTER_ACCESS_TOKEN_SECRET == null)
       {
-         throw new Exception("* * *  Please set twitter.username and twitter.password in system property  * * *");
+         throw new Exception("* * *  Please set twitter.consumerKey, twitter.consumerSecret, twitter.accessToken and twitter.accessTokenSecuret in system property  * * *");
       }
       super.setUp();
    }
@@ -101,8 +106,10 @@
    public void testIncomingWithInvalidCredentials() throws Exception
    {
       HashMap<String,String> params = new HashMap<String,String>();
-      params.put(KEY_USERNAME, "invalidUsername");
-      params.put(KEY_PASSWORD, "invalidPassword");
+      params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
+      params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
+      params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
+      params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
       internalTestIncomingFailedToInitialize(params);
    }
 
@@ -139,18 +146,14 @@
    public void testOutgoingWithInvalidCredentials() throws Exception
    {
       HashMap<String,String> params = new HashMap<String,String>();
-      params.put(KEY_USERNAME, "invalidUsername");
-      params.put(KEY_PASSWORD, "invalidPassword");
+      params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
+      params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
+      params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
+      params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
       internalTestOutgoingFailedToInitialize(params);
    }
    
-   /**
-    *  This will fail until TFJ-347 is fixed.
-    * http://twitter4j.org/jira/browse/TFJ-347
-    * 
-    * @throws Exception
-    */
-   public void _testOutgoingWithInReplyTo() throws Exception
+   public void testOutgoingWithInReplyTo() throws Exception
    {
       internalTestOutgoingWithInReplyTo();
    }
@@ -161,7 +164,10 @@
       ClientSession session = null;
       String queue = "TwitterTestQueue";
       int interval = 5;
-      Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+      Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+                                                                        TWITTER_CONSUMER_SECRET,
+                                                                        new AccessToken(TWITTER_ACCESS_TOKEN,
+                                                                                        TWITTER_ACCESS_TOKEN_SECRET));
       String testMessage = "TwitterTest/incoming: " + System.currentTimeMillis();
       log.debug("test incoming: " + testMessage);
       
@@ -171,8 +177,10 @@
          HashMap<String, Object> config = new HashMap<String, Object>();
          config.put(TwitterConstants.INCOMING_INTERVAL, interval);
          config.put(TwitterConstants.QUEUE_NAME, queue);
-         config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
-         config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+         config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+         config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+         config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+         config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
          ConnectorServiceConfiguration inconf =
                new ConnectorServiceConfiguration(
                TwitterIncomingConnectorServiceFactory.class.getName(),
@@ -244,22 +252,32 @@
       HornetQServer server0 = null;
       String connectorName = "test-incoming-connector"; 
       String queue = "TwitterTestQueue";
-      String userName = "invalidUsername";
-      String password = "invalidPassword";
+      String consumerKey = "invalidConsumerKey";
+      String consumerSecret = "invalidConsumerSecret";
+      String accessToken = "invalidAccessToken";
+      String accessTokenSecret = "invalidAccessTokenSecret";
       int interval = 5;
       
       if(params.containsKey(KEY_CONNECTOR_NAME))
       {
          connectorName = params.get(KEY_CONNECTOR_NAME);
       }
-      if(params.containsKey(KEY_USERNAME))
+      if(params.containsKey(KEY_CONSUMER_KEY))
       {
-         userName = params.get(KEY_USERNAME);
+         consumerKey = params.get(KEY_CONSUMER_KEY);
       }
-      if(params.containsKey(KEY_PASSWORD))
+      if(params.containsKey(KEY_CONSUMER_SECRET))
       {
-         password = params.get(KEY_PASSWORD);
+         consumerSecret = params.get(KEY_CONSUMER_SECRET);
       }
+      if(params.containsKey(KEY_ACCESS_TOKEN))
+      {
+         accessToken = params.get(KEY_ACCESS_TOKEN);
+      }
+      if(params.containsKey(KEY_ACCESS_TOKEN_SECRET))
+      {
+         accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
+      }
       if(params.containsKey(KEY_QUEUE_NAME))
       {
          queue = params.get(KEY_QUEUE_NAME);
@@ -271,8 +289,10 @@
          HashMap<String, Object> config = new HashMap<String, Object>();
          config.put(TwitterConstants.INCOMING_INTERVAL, interval);
          config.put(TwitterConstants.QUEUE_NAME, queue);
-         config.put(TwitterConstants.USER_NAME, userName);
-         config.put(TwitterConstants.PASSWORD, password);
+         config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
+         config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
+         config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
+         config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
          ConnectorServiceConfiguration inconf =
                new ConnectorServiceConfiguration(TwitterIncomingConnectorServiceFactory.class.getName(),
                      config,
@@ -306,7 +326,10 @@
       HornetQServer server0 = null;
       ClientSession session = null;
       String queue = "TwitterTestQueue";
-      Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+      Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+                                                                        TWITTER_CONSUMER_SECRET,
+                                                                        new AccessToken(TWITTER_ACCESS_TOKEN,
+                                                                                        TWITTER_ACCESS_TOKEN_SECRET));
       String testMessage = "TwitterTest/outgoing: " + System.currentTimeMillis();
       log.debug("test outgoing: " + testMessage);
 
@@ -315,8 +338,10 @@
          Configuration configuration = createDefaultConfig(false);
          HashMap<String, Object> config = new HashMap<String, Object>();
          config.put(TwitterConstants.QUEUE_NAME, queue);
-         config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
-         config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+         config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+         config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+         config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+         config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
          ConnectorServiceConfiguration outconf =
                new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
                      config,
@@ -388,25 +413,35 @@
    protected void internalTestOutgoingFailedToInitialize(HashMap<String,String> params) throws Exception
    {
       HornetQServer server0 = null;
-      String connectorName = "test-outgoing-connector"; 
+      String connectorName = "test-outgoing-connector";
       String queue = "TwitterTestQueue";
-      String userName = TWITTER_USERNAME;
-      String password = TWITTER_PASSWORD;
+      String consumerKey = TWITTER_CONSUMER_KEY;
+      String consumerSecret = TWITTER_CONSUMER_SECRET;
+      String accessToken = TWITTER_ACCESS_TOKEN;
+      String accessTokenSecret = TWITTER_ACCESS_TOKEN_SECRET;
       
       if(params.containsKey(KEY_CONNECTOR_NAME))
       {
          connectorName = params.get(KEY_CONNECTOR_NAME);
       }
-      if(params.containsKey(KEY_USERNAME))
+      if (params.containsKey(KEY_CONSUMER_KEY))
       {
-         userName = params.get(KEY_USERNAME);
+         consumerKey = params.get(KEY_CONSUMER_KEY);
       }
-      if(params.containsKey(KEY_PASSWORD))
+      if (params.containsKey(KEY_CONSUMER_SECRET))
       {
-         password = params.get(KEY_PASSWORD);
+         consumerSecret = params.get(KEY_CONSUMER_SECRET);
       }
-      if(params.containsKey(KEY_QUEUE_NAME))
+      if (params.containsKey(KEY_ACCESS_TOKEN))
       {
+         accessToken = params.get(KEY_ACCESS_TOKEN);
+      }
+      if (params.containsKey(KEY_ACCESS_TOKEN_SECRET))
+      {
+         accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
+      }
+      if (params.containsKey(KEY_QUEUE_NAME))
+      {
          queue = params.get(KEY_QUEUE_NAME);
       }
       
@@ -415,12 +450,14 @@
          Configuration configuration = createDefaultConfig(false);
          HashMap<String, Object> config = new HashMap<String, Object>();
          config.put(TwitterConstants.QUEUE_NAME, queue);
-         config.put(TwitterConstants.USER_NAME, userName);
-         config.put(TwitterConstants.PASSWORD, password);
+         config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
+         config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
+         config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
+         config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
          ConnectorServiceConfiguration outconf =
                new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
                      config,
-               "test-outgoing-connector");
+               connectorName);
          configuration.getConnectorServiceConfigurations().add(outconf);
          CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
          configuration.getQueueConfigurations().add(qc);
@@ -446,16 +483,21 @@
       HornetQServer server0 = null;
       ClientSession session = null;
       String queue = "TwitterTestQueue";
-      Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+      Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
+                                                                        TWITTER_CONSUMER_SECRET,
+                                                                        new AccessToken(TWITTER_ACCESS_TOKEN,
+                                                                                        TWITTER_ACCESS_TOKEN_SECRET));
       String testMessage = "TwitterTest/outgoing with in_reply_to: " + System.currentTimeMillis();
-      String replyMessage = "@" + TWITTER_USERNAME + " TwitterTest/outgoing reply: " + System.currentTimeMillis();
+      String replyMessage = "@" + twitter.getScreenName() + " TwitterTest/outgoing reply: " + System.currentTimeMillis();
       try
       {
          Configuration configuration = createDefaultConfig(false);
          HashMap<String, Object> config = new HashMap<String, Object>();
          config.put(TwitterConstants.QUEUE_NAME, queue);
-         config.put(TwitterConstants.USER_NAME, TWITTER_USERNAME);
-         config.put(TwitterConstants.PASSWORD, TWITTER_PASSWORD);
+         config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
+         config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
+         config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
+         config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
          ConnectorServiceConfiguration outconf =
                new ConnectorServiceConfiguration(TwitterOutgoingConnectorServiceFactory.class.getName(),
                      config,



More information about the hornetq-commits mailing list