HadoopのIPC/RPC

Hadoopは内部的に独自のIPC/RPCフレームワークを用いています。
このフレームワークは、Writableを用いてバイナリでやりとりをします。


以下、実際に使ってみたメモです。


まず、プロトコルのインターフェースを定義します。
この時、versionIDも指定します。

public interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
	public static final long versionID = 1L;
	Text aaa();
}


次に、サーバー側のコードです。

public class Server implements ClientProtocol {
	public void start() throws IOException {
		Configuration conf = new Configuration();
		org.apache.hadoop.ipc.RPC.Server server = RPC.getServer(this, "localhost", 16000, conf);
		server.start();
	}

	@Override
	public long getProtocolVersion(String protocol, long clientVersion)
			throws IOException {
		return 1L;
	}

	@Override
	public Text aaa() {
		return new Text("aaa");
	}
}


RPC.getServerの第一引数に、プロトコルのインターフェースを実装した
クラスのインスタンスを指定します(上記ではthis)。
getProtocolVersionメソッドでは、プロトコルのインターフェースのversionIDと
同じバージョンを返さないとエラーになります。
バージョン管理のためのものですね。


最後にクライアントのコードです。

// サーバの起動
Server s = new Server();
s.start();

// クライアント
Configuration conf = new Configuration();
InetSocketAddress addr = new InetSocketAddress("localhost", 16000);

ClientProtocol client = (ClientProtocol) RPC.waitForProxy(
		ClientProtocol.class, ClientProtocol.versionID, addr, conf);
		
Text res = client.aaa();
System.out.println(res.toString()); // aaa


RPC.waitForProxyでプロトコルを取得し、
定義したメソッドを実行すると裏側で通信が起こりIPC/RPCが実現されます。


Hadoopではこのフレームワークを用いて、NameNodeやDataNode、JobTracker、TaskTracker、
そしてHDFSクライアント間で連携をしています。
ちなみに、HBaseもこのフレームワークを使っているっぽいです。


Hadoop内の関連プロダクトとしてはAvroがありますね。
http://avro.apache.org/