六狼论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 197|回复: 0

ActiveMQ基于derby数据库的spring整合

[复制链接]

升级  73.8%

273

主题

273

主题

273

主题

进士

Rank: 4

积分
869
 楼主| 发表于 2013-1-14 22:59:59 | 显示全部楼层 |阅读模式
服务端代码:
package easyway.activemq.app.demo3;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStream;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * 消息的创建者 * @author longgangbai * */public class StreamMsgProducer {public static void main(String[] args) {ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");Connection conn = null;try {conn = activeMqfactory.createConnection();conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);Destination queue = session.createQueue("streamMsg");MessageProducer producer = session.createProducer(queue);             File file=new File("C:\\send.txt");InputStream in = new FileInputStream(file);byte[] buffer = new byte[2048];int c = -1;while ((c = in.read(buffer)) > 0) {StreamMessage smsg = session.createStreamMessage();smsg.writeBytes(buffer, 0, c);producer.send(smsg);System.out.println("send: " + c);}in.close();} catch (JMSException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}} 
客户端代码:
package easyway.activemq.app.demo3;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStream;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.StreamMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.xbean.BrokerFactoryBean;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * 消息的消费者 * @author longgangbai * */public class StreamMsgConsumer {public void receive() {ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");Connection conn = null;try {conn = activeMqfactory.createConnection();conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);Destination queue = session.createQueue("streamMsg");MessageConsumer consumer = session.createConsumer(queue);OutputStream out = new FileOutputStream("c:\\receive.txt");byte[] buffer = new byte[2048];while (true) {Message msg = consumer.receive(5000);if (msg == null) {break;}if (msg instanceof StreamMessage) {StreamMessage smsg = (StreamMessage) msg;int c = smsg.readBytes(buffer);out.write(buffer, 0, c);System.out.println("Receive: " + c);}}out.close();} catch (JMSException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}public static void main(String[] args) {new StreamMsgConsumer().receive();}} 
activemq的配置如下:
<?xml version="1.0" encoding="UTF-8"?><beans  xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">  <!-- Allows us to use system properties as variables in this configuration file -->  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>  <broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core">    <persistenceAdapter>       <jdbcPersistenceAdapter dataDirectory="activemq-data" dataSource="#derby-ds"/>    </persistenceAdapter>    <transportConnectors>       <transportConnector name="default" uri="tcp://localhost:61619"/>    </transportConnectors>  </broker>  <bean id="derbyds" class="org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter"/>     <!-- Embedded Derby DataSource Sample Setup -->  <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">    <property name="databaseName" value="derbydb"/>    <property name="createDatabase" value="create"/>  </bean></beans> 
activemq-jdbc.xml的配置如下:
<?xml version="1.0" encoding="UTF-8"?><beans  xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">   <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">      <property name="config"  value="classpath:activemq.xml"/>      <property name="start"  value="true"/>   </bean>      <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">      <property name="brokerURL" value="tcp://localhost:61619"/>   </bean></beans> 
 
 
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

本版积分规则

快速回复 返回顶部 返回列表