Subscribed unsubscribe Subscribe Subscribe

Kengo's blog

Technical articles about original projects, JVM, Static Analysis and JavaScript.

appengine-mapreduceをプログラムから実行する

以前使ったappengine-mapreduceをプログラム内から実行する方法をまとめています。revision 150を対象としていますので、お使いのrevisionが新しすぎて使えない場合は公式資料を参照願います。

設定を作成する

まずは処理を実行するための設定を行います。ここで設定するべきは以下の2つです。

  • 処理に用いるAppEngineMapperの設定
    • 実行されるAppEngineMapperのサブクラス
    • 入力として使われるInputFormat
  • 処理完了後にPOSTするコールバックURL

これらをorg.apache.hadoop.conf.Configurationインスタンスに対して設定してやります。具体的には以下のとおりです。

Configuration conf = new Configuration(false);

// 実行されるAppEngineMapperのサブクラス
conf.setClass("mapreduce.map.class", CounterMapper.class, Mapper.class);

// 入力として使われるInputFormat
conf.setClass("mapreduce.inputformat.class", DatastoreInputFormat.class, InputFormat.class);
// 入力として使われるKind
conf.set("mapreduce.mapper.inputformat.datastoreinputformat.entitykind", "UserInfo");

// 処理完了後にPOSTするコールバックURL
conf.set(AppEngineJobContext.DONE_CALLBACK_URL_KEY, "/count/complete");

設定をTaskに渡し、TaskQueueに積む

mapreduce処理はcom.google.appengine.tools.mapreduce.MapReduceServletを通じて実行できるので、このサーブレットを叩くためのTaskをTaskQueueに積んでやります。
設定はcom.google.appengine.tools.mapreduce.ConfigurationXmlUtilクラスのstaticメソッドによってXML形式に変換する必要があります。

String xml = ConfigurationXmlUtil.convertConfigurationToXml(conf);
Queue queue = QueueFactory.getDefaultQueue();
TaskOptions task = TaskOptions.Builder.withDefaults()
		.url("/mapreduce/start").method(Method.POST)  // "/mapreduce/*" が MapReduceServletにマッピングされている場合
		.param("configuration", xml);
queue.add(task);

コールバック処理から処理結果にアクセスする

Mapperで操作したカウンタなどの情報はすべてDatastoreのMapReduceStateというKindに記録されています。コールバックURLへのPOSTにはjob_idがパラメータとして渡されますので、これをキーとして引き出すことができます。

DatastoreService ds = DatastoreServiceFactory.getDatastoreService();
JobID id = JobID.forName(httpServletRequest.getParameter("job_id"));
// JobIDをキーとして処理結果を取り出す
MapReduceState state = MapReduceState.getMapReduceStateFromJobID(ds, id);

// counterにアクセスする
long userCount = state.getCounters().getGroup("counter").findCounter("all").getValue();
logger.info(String.format("USERS = %,d;%n", users));


以上です。コールバックURLを叩く処理やDatastoreから処理結果を取り出すメソッドが用意されているので、少ないコードで回したい処理を簡単に実行することが可能となっています。twistoireでは週に1回ユーザ数をカウントする際に使用する予定です。