当前位置:编程学习 > JAVA >>

Hama学习笔记 -通信

Hama的文档目前还不详细,没有找到关于Hama如何发送消息的说明,只好自己动手做实验了。
 
按照BSP的模型,每一个超步应该是有明确的三个步骤的:计算->通信->同步
 
但是Hama当中,在节点进入同步状态之前,是否可以和其他结点即时地收发消息呢?如果可以,无疑会使得bsp程序更加灵活,但是这样也会带来不必要的麻烦:如果bsp程序设计不当,各个节点之间随意通信可能会使得程序的性能非常糟糕。
 
为了搞清楚通信的情况,做了如下实验:
 
打开计算PI的example,在MyEstimator的bsp方法中调用peer.send之前设置断点,其实send方法的注释文档就已经告诉我们一些有用的信息了:
 
   * Send a data with a tag to another BSPSlave corresponding to hostname.
   * Messages sent by this method are not guaranteed to be received in a sent
   * order.
 
可见hama bsp并不严格地保证消息的接收和发送顺序一致。
在eclipse中开始debug(单击模式,此时hama用多线程来模拟多个节点的计算),发现调用send最终执行的是如下的一段代码:
 
[java]  
InetSocketAddress targetPeerAddress = null;  
// Get socket for target peer.  
if (peerSocketCache.containsKey(peerName)) {  
  targetPeerAddress = peerSocketCache.get(peerName);  
} else {  
  targetPeerAddress = BSPNetUtils.getAddress(peerName);  
  peerSocketCache.put(peerName, targetPeerAddress);  
}  
MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);  
if (queue == null) {  
  queue = getQueue();  
}  
queue.add(msg);  
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);  
outgoingQueues.put(targetPeerAddress, queue);  
notifySentMessage(peerName, msg);  
 
可见这里是将要发送的消息添加到队列中了,并没有将消息发送出去。
继续debug,发现调用sync方法执行的是如下的代码:
 
[java] 
// normally all messages should been send now, finalizing the send phase  
messenger.finishSendPhase();  
Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger  
    .getMessageIterator();  
  
while (it.hasNext()) {  
  Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();  
  final InetSocketAddress addr = entry.getKey();  
  final Iterable<M> messages = entry.getValue();  
  
  final BSPMessageBundle<M> bundle = combineMessages(messages);  
  // remove this message during runtime to save a bit of memory  
  it.remove();  
  try {  
    messenger.transfer(addr, bundle);  
  } catch (Exception e) {  
    LOG.error("Error while sending messages", e);  
  }  
}  
  
if (this.faultToleranceService != null) {  
  try {  
    this.faultToleranceService.beforeBarrier();  
  } catch (Exception e) {  
    throw new IOException(e);  
  }  
}  
  
long startBarrier = System.currentTimeMillis();  
enterBarrier();  
  
if (this.faultToleranceService != null) {  
  try {  
    this.faultToleranceService.duringBarrier();  
  } catch (Exception e) {  
    throw new IOException(e);  
  }  
}  
  
// Clear outgoing queues.  
messenger.clearOutgoingQueues();  
  
leaveBarrier();  
  
incrementCounter(PeerCounter.TIME_IN_SYNC_MS,  
    (System.currentTimeMillis() - startBarrier));  
incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);  
  
currentTaskStatus.setCounters(counters);  
  
if (this.faultToleranceService != null) {  
  try {  
    this.faultToleranceService.afterBarrier();  
  } catch (Exception e) {  
    throw new IOException(e);  
  }  
}  
  
umbilical.statusUpdate(taskId, currentTaskStatus);  
 
从第一行的注释即可看出,之前send要发送的消息在开始同步时才会真正地发送出去。此外,貌似在
[java] view plaincopy
messenger.clearOutgoingQueues();  
中会准备好本地的消息队列,之后才可以读取从其他结点发送过来的消息,具体怎么收消息还没研究好,经过实验发现似乎是在sync返回之后,才能接受到从其他节点发送过来的消息,在sync之前getCurrentMessage()得到的消息总是空值。
 
由此大概得出了结论:hama bsp在一个超步中只能发消息或者处理上一个超步中接收到的消息。
补充:软件开发 , Java ,
CopyRight © 2022 站长资源库 编程知识问答 zzzyk.com All Rights Reserved
部分文章来自网络,