RabbitMQ进阶操作

一:上篇大概介绍了rabbitMQ的基本信息和简单的发送接收,这篇文章说一下进阶一点的,也是在项目中运用的,仅供参考,代码复制可用。废话不多说,直接上代码。

二:实际上生产者发送消息,并不是直接和消息队列接触的,它是和交换机EXCHANGE交互,交换机负责将消息分发到绑定的各个消息队列。消费者想要消费消息,要先绑定交换机和相应的消息队列,也就是QUEUE。

1:生产者代码。代码读取一个文件,按行读取,按行发送。

//定义队列名称
private final static String QUEUE_NAME = "appreportdata_800021";
//定义交换机名称
private final static String EXCHANGE_NAME = "projectcode_exchange_direct";
public static void send(String content) {
		ConnectionFactory factory = null;
		Connection connection = null;
		Channel channel = null;
		try {
			factory = new ConnectionFactory();
			factory.setHost("localhost");
			factory.setPort(5672);
			factory.setUsername("guest");
			factory.setPassword("guest");
			connection = factory.newConnection();
                        channel.basicQos(1);
			channel = connection.createChannel();
                        //绑定交换机
			channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
                        //订阅消息
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                         //绑定队列
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "app_800021");  
			BufferedReader reader = null;
	        StringBuffer sb = new StringBuffer();
	        String rc1 = "";
	        String rc="utf-8";
	        try {
				if(rc.equals("")){
	        		rc1 = "GB2312";
	        	}else{
	        		rc1 = rc;
	        	}
	           // reader = new BufferedReader(new FileReader(file));
				File file = new File("文件路径");
	            reader = new BufferedReader(new InputStreamReader(new FileInputStream(file),rc1));
	            String tempString = null;
	            // 一次读入一行,直到读入null为文件结束
	            while ((tempString = reader.readLine()) != null) {
	                // 显示行号
	              System.out.println("line "+ tempString);
	                sb.append(tempString);
                        channel.confirmSelect();
	                channel.basicPublish(EXCHANGE_NAME, "app_800021", MessageProperties.PERSISTENT_BASIC, tempString.getBytes("UTF-8"));
                        channel.waitForConfirmsOrDie();
	                //sb.append("\n\r");
	            }
	            
	            reader.close();
	        } catch (IOException e) {
	            e.printStackTrace();
	        } finally {
	            if (reader != null) {
	                try {
	                    reader.close();
	                } catch (IOException e1) {
	                }
	            }
	        }
			
			//System.out.println("已经发送消息....." + content);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		} finally {
			if (channel != null) {
				// 关闭资源
				 channel.close();
			}
			if (connection != null) {
				connection.close();
			}
		}
	}

2:消费者代码

  //队列名称      
String QUEUE_NAME="";
	ConnectionFactory factory = null;
	Connection connection = null;
	Channel channel = null;
//交换机名称
	private String EXCHANGE_NAME = "";
//路由键
	private String ROUTE_KEY = "";
                QUEUE_NAME = "appreportdata_800021";
		EXCHANGE_NAME="projectcode_exchange_direct";
		factory = new ConnectionFactory();
		factory.setHost("ip");
		try {
			connection = factory.newConnection();
			channel = connection.createChannel();
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_KEY);
Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
						byte[] body) throws IOException {
					//获取队列中的内容
					String content = new String(body, "UTF-8");
					if (content != null && !("").equals(content)) {
						if (!"".equals(tableName) && tableName != null && content.contains(tableName)) {
							flag = false;

						}
//消费信息逻辑处理,
						//接收消息回复ack
						channel.basicAck(envelope.getDeliveryTag(), false);
					}					
				}
			};
			//设置第二个参数为false 当noAck=false时,RabbitMQ会等待消费者显式发回
			channel.basicConsume(QUEUE_NAME, false, consumer);
		} catch (IOException e) {
			//e.printStackTrace();
		} catch (TimeoutException e) {
	
		}

生产消费端代码基本框架就是像这些,具体业务逻辑,可以具体处理。

3:这里还有一些问题,消费端消费信息的时候是线程阻塞的,没有消息的时候消费端是一直等着的,也就是说程序是一直在public void handleDelivery这个方法里停着,监听着rabbitMQ服务上相应的消息发布。这个是被动式的接受消息,那么问题来了,当接收的信息是sql语句的时候,接收是一条一条接收的,但是接收一条就进行一次数据库操作,显然是效率非常低的,这时会想着把数据存储在临时list里,批量操作,那么问题又来了,批量操作是有条件的,比如说达到500条的时候进行一次数据库操作,但是消费端发送700条,会有200条达不到条件,不会进行批量入库操作,但是此时程序是阻塞的,判断条件什么的不管用的,只会等着达到500条的时候才会批量操作入库,但是数据要求及时入库,这时就不行了。但是问题还是要解决的。

4:解决上面问题,rabbitMQ还有一种获取消息的方式,这是主动式的拉取数据,但是没有消息的时候会抛异常的。代码如下

                GetResponse response = null;               
                factory = new ConnectionFactory();
		factory.setHost(ConfParser.host);
		factory.setAutomaticRecoveryEnabled(true);
		factory.setNetworkRecoveryInterval(10);// 设置 每10s ,重试一次  
		factory.setTopologyRecoveryEnabled(false);// 设置不重新声明交换器,队列等信息。 
		try {
			connection = factory.newConnection();
			channel = connection.createChannel();
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_KEY);
			
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}

public void getMessageAndDoIt() {
				boolean autoAck = false;
				while (true) {
					try {
						response = channel.basicGet(QUEUE_NAME, autoAck);
					
						byte[] body = response.getBody();
						content = new String(body, "UTF-8");
						sqlList.add(content);
						//每500条数据提交一次
						if (sqlList.size()==500) {
							//执行入库动作
							doActionSql();
						}
						long deliveryTag = response.getEnvelope().getDeliveryTag();
						channel.basicAck(deliveryTag, false);
					} catch (Exception e) {	
						//正在执行接收消息任务的时候,channel出现异常,直接执行入库任务,保证sqlList内的数据不丢失
						if (sqlList.size()>0) {
							//执行入库动作
							doActionSql();
						}
try {
							Thread.sleep(1000*60*2);
						} catch ( InterruptedException e1) {
							//休眠状态下连接断的话,休眠前数据已经全部进入入库步骤,不需考虑
							e.printStackTrace();
						}
		                        }
				}
			}

大概说一下业务逻辑,关键就是这个 GetResponse 这个方法,response = channel.basicGet(QUEUE_NAME, autoAck);用死循环一直去消息队列里获取数据,当没有数据的时候会抛异常,这时候就可以在有异常的时候知道消息队列没有数据了,没有达到批量上限的数据也可以在异常处理里做入库操作了。完美解决。

代码仅供参考,如有不对的地方敬请谅解和指正。希望能给大家带来帮助。

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值