Hama的文档目前还不详细,没有找到关于Hama如何发送消息的说明,只好自己动手做实验了。
按照BSP的模型,每一个超步应该是有明确的三个步骤的:计算->通信->同步
但是Hama当中,在节点进入同步状态之前,是否可以和其他结点即时地收发消息呢?如果可以,无疑会使得bsp程序更加灵活,但是这样也会带来不必要的麻烦:如果bsp程序设计不当,各个节点之间随意通信可能会使得程序的性能非常糟糕。
为了搞清楚通信的情况,做了如下实验:
打开计算PI的example,在MyEstimator的bsp方法中调用peer.send之前设置断点,其实send方法的注释文档就已经告诉我们一些有用的信息了:
* Send a data with a tag to another BSPSlave corresponding to hostname.
* Messages sent by this method are not guaranteed to be received in a sent
* order.
可见hama bsp并不严格地保证消息的接收和发送顺序一致。
在eclipse中开始debug(单击模式,此时hama用多线程来模拟多个节点的计算),发现调用send最终执行的是如下的一段代码:
[java]
InetSocketAddress targetPeerAddress = null;
// Get socket for target peer.
if (peerSocketCache.containsKey(peerName)) {
targetPeerAddress = peerSocketCache.get(peerName);
} else {
targetPeerAddress = BSPNetUtils.getAddress(peerName);
peerSocketCache.put(peerName, targetPeerAddress);
}
MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);
if (queue == null) {
queue = getQueue();
}
queue.add(msg);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
outgoingQueues.put(targetPeerAddress, queue);
notifySentMessage(peerName, msg);
可见这里是将要发送的消息添加到队列中了,并没有将消息发送出去。
继续debug,发现调用sync方法执行的是如下的代码:
[java]
// normally all messages should been send now, finalizing the send phase
messenger.finishSendPhase();
Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
.getMessageIterator();
while (it.hasNext()) {
Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
final InetSocketAddress addr = entry.getKey();
final Iterable<M> messages = entry.getValue();
final BSPMessageBundle<M> bundle = combineMessages(messages);
// remove this message during runtime to save a bit of memory
it.remove();
try {
messenger.transfer(addr, bundle);
} catch (Exception e) {
LOG.error("Error while sending messages", e);
}
}
if (this.faultToleranceService != null) {
try {
this.faultToleranceService.beforeBarrier();
} catch (Exception e) {
throw new IOException(e);
}
}
long startBarrier = System.currentTimeMillis();
enterBarrier();
if (this.faultToleranceService != null) {
try {
this.faultToleranceService.duringBarrier();
} catch (Exception e) {
throw new IOException(e);
}
}
// Clear outgoing queues.
messenger.clearOutgoingQueues();
leaveBarrier();
incrementCounter(PeerCounter.TIME_IN_SYNC_MS,
(System.currentTimeMillis() - startBarrier));
incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);
currentTaskStatus.setCounters(counters);
if (this.faultToleranceService != null) {
try {
this.faultToleranceService.afterBarrier();
} catch (Exception e) {
throw new IOException(e);
}
}
umbilical.statusUpdate(taskId, currentTaskStatus);
从第一行的注释即可看出,之前send要发送的消息在开始同步时才会真正地发送出去。此外,貌似在
[java] view plaincopy
messenger.clearOutgoingQueues();
中会准备好本地的消息队列,之后才可以读取从其他结点发送过来的消息,具体怎么收消息还没研究好,经过实验发现似乎是在sync返回之后,才能接受到从其他节点发送过来的消息,在sync之前getCurrentMessage()得到的消息总是空值。
由此大概得出了结论:hama bsp在一个超步中只能发消息或者处理上一个超步中接收到的消息。