以前使ったappengine-mapreduceをプログラム内から実行する方法をまとめています。revision 150を対象としていますので、お使いのrevisionが新しすぎて使えない場合は公式資料を参照願います。
設定を作成する
まずは処理を実行するための設定を行います。ここで設定するべきは以下の2つです。
- 処理に用いるAppEngineMapperの設定
- 実行されるAppEngineMapperのサブクラス
- 入力として使われるInputFormat
- 処理完了後にPOSTするコールバックURL
これらをorg.apache.hadoop.conf.Configurationインスタンスに対して設定してやります。具体的には以下のとおりです。
Configuration conf = new Configuration(false); // 実行されるAppEngineMapperのサブクラス conf.setClass("mapreduce.map.class", CounterMapper.class, Mapper.class); // 入力として使われるInputFormat conf.setClass("mapreduce.inputformat.class", DatastoreInputFormat.class, InputFormat.class); // 入力として使われるKind conf.set("mapreduce.mapper.inputformat.datastoreinputformat.entitykind", "UserInfo"); // 処理完了後にPOSTするコールバックURL conf.set(AppEngineJobContext.DONE_CALLBACK_URL_KEY, "/count/complete");
設定をTaskに渡し、TaskQueueに積む
mapreduce処理はcom.google.appengine.tools.mapreduce.MapReduceServletを通じて実行できるので、このサーブレットを叩くためのTaskをTaskQueueに積んでやります。
設定はcom.google.appengine.tools.mapreduce.ConfigurationXmlUtilクラスのstaticメソッドによってXML形式に変換する必要があります。
String xml = ConfigurationXmlUtil.convertConfigurationToXml(conf); Queue queue = QueueFactory.getDefaultQueue(); TaskOptions task = TaskOptions.Builder.withDefaults() .url("/mapreduce/start").method(Method.POST) // "/mapreduce/*" が MapReduceServletにマッピングされている場合 .param("configuration", xml); queue.add(task);
コールバック処理から処理結果にアクセスする
Mapperで操作したカウンタなどの情報はすべてDatastoreのMapReduceStateというKindに記録されています。コールバックURLへのPOSTにはjob_idがパラメータとして渡されますので、これをキーとして引き出すことができます。
DatastoreService ds = DatastoreServiceFactory.getDatastoreService(); JobID id = JobID.forName(httpServletRequest.getParameter("job_id")); // JobIDをキーとして処理結果を取り出す MapReduceState state = MapReduceState.getMapReduceStateFromJobID(ds, id); // counterにアクセスする long userCount = state.getCounters().getGroup("counter").findCounter("all").getValue(); logger.info(String.format("USERS = %,d;%n", users));
以上です。コールバックURLを叩く処理やDatastoreから処理結果を取り出すメソッドが用意されているので、少ないコードで回したい処理を簡単に実行することが可能となっています。twistoireでは週に1回ユーザ数をカウントする際に使用する予定です。