在Spark中,我们在Driver里放了一个RabbitMQ,接收消息,然后处理。避免一次次地上传Jar包,进行前面的准备操作。
然而,在实际操作中,发现了一些诡异的事情。
第一件事情就是,当我们使用yarn -kill
命令,将这个Job kill掉之后,我们发现,实际上Driver并没有被kill掉。还是会继续监听那个队列,而且,SparkContext
确实是被关闭掉了。
而正常的情况,应当是yarn -kill
以后,Driver被kill掉。
于是,我猜想,是因为在poll消息的时候,由于有wait存在,使Driver的线程阻塞了,不能接收到ApplicationMaster发送来的kill消息,导致Driver不会停掉。
然而,在验证这个想法的过程中,我发现,即使不poll消息,Driver还是不会停止。
代码如下:
package com.hyper.cdp.label.utils;
import com.hyper.util.CacheConfUtils;
import com.hyper.util.Log;
import com.hyper.util.Mapper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQAccessor implements Log {
private String host;
private String port;
private String username;
private String password;
private String vhost;
private String queue;
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;
private QueueingConsumer queueingConsumer;
public RabbitMQAccessor(String queue) {
this(CacheConfUtils.get("amqp.host", ""),
CacheConfUtils.get("amqp.port", "5672"),
CacheConfUtils.get("amqp.username", ""),
CacheConfUtils.get("amqp.password", ""),
CacheConfUtils.get("amqp.vhost", ""),
queue);
}
public RabbitMQAccessor(String host, String port, String username, String password,
String vhost, String queue) {
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.vhost = vhost;
this.queue = queue;
}
public void init() throws IOException, TimeoutException {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(Integer.valueOf(port));
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queue, true, false, false, null);
queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queue, queueingConsumer);
}
private QueueingConsumer.Delivery currentDelivery = null;
public String poll() {
String result = null;
try {
currentDelivery = queueingConsumer.nextDelivery();
if (currentDelivery != null) {
byte[] bs = currentDelivery.getBody();
result = new String(bs);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
public Object pollJson(Class targetClass) {
Object result = null;
String string = poll();
if (string == null || string.trim().equals("")) return null;
try {
result = Mapper.readValue(string, targetClass);
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
public void ack() {
try {
channel.basicAck(currentDelivery.getEnvelope().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
public void close() {
try {
channel.close();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQAccessor rabbitMQAccessor = new RabbitMQAccessor("host",
"5672",
"a",
"b",
"/c",
"queue");
rabbitMQAccessor.init();
System.out.println("------- before close");
rabbitMQAccessor.close();
System.out.println("------- after close");
}
}
就简单地初始化了一下,然后关闭一下,想结束掉这个程序,但是它竟然不给我停。
所以我就很奇怪。于是猜测是创建Connection时搞得鬼。
通过断点调试,还真发现是它在捣蛋。
在AMQConnection
这个类中,我们能看到,在创建Connection时,有这么一段:
我们再看MainLoop
的实现:
最重要的是那个while循环。
我们可以看到,只要_running
是true,它就会一直运行。而_running
只有当Connection关闭时,才会是false.
你可能会说,这个是子线程啊。跟主线程不结束有什么关系。
我们从StackOverflow中可以得知,在Java中,只要存在一个不是daemon的子线程还在运行,主线程就不会退出。
实际上,这个问题很简单。但是一年没碰这种Java的基础知识,好多内容都有点忘记了。所以啊,还是要温故知新啊。
另外,上面对Spark的那个问题,我的猜测是错误的。
上面的代码,正确的应该是,在close
方法中,加上connection.close()