ワークフローエンジンDigDagのプラグイン機能を試してみる
ワークフローエンジンのDigdagについて記事を書いてきましたが、v0.8.3からプラグイン機能が入ったというアナウンスがありましたので早速サンプルを動かしてみました。
ドキュメントやトラブル等があったのでそれらの対策等を紹介いたします。
また、Slackにメッセージを投稿するプラグインもサンプルで作ってみたのでやり方を紹介します。
プラグインプロジェクトのビルド
DigDagのプラグイン機能のサンプルはこちらにおいてあります。
(サンプルというかテスト用でしょうけど…)
こちらの部分を抜き出したフォルダ構成は次のようになります。
plugin ├── build │ ├── publications │ │ └── mavenJava │ │ └── pom-default.xml │ └── repo │ └── io │ └── digdag │ └── plugin │ └── digdag-plugin-example │ ├── 0.1.0 │ │ ├── digdag-plugin-example-0.1.0.jar │ │ ├── digdag-plugin-example-0.1.0.jar.md5 │ │ ├── digdag-plugin-example-0.1.0.jar.sha1 │ │ ├── digdag-plugin-example-0.1.0.pom │ │ ├── digdag-plugin-example-0.1.0.pom.md5 │ │ └── digdag-plugin-example-0.1.0.pom.sha1 │ ├── maven-metadata.xml │ ├── maven-metadata.xml.md5 │ └── maven-metadata.xml.sha1 ├── build.gradle ├── gradle │ └── wrapper │ ├── gradle-wrapper.jar │ └── gradle-wrapper.properties ├── gradlew ├── gradlew.bat └── src └── main ├── java │ └── io │ └── digdag │ └── plugin │ └── example │ ├── ExampleOperatorFactory.java │ └── ExamplePlugin.java └── resources └── META-INF └── services └── io.digdag.spi.Plugin
このプロジェクトフォルダ内の「digdag-plugin-example」がDigDagのプラグインのプロジェクトになっています。
中のプロジェクトはmaven-publishを利用してプロジェクトをビルドしてビルド生成物を作成します。
早速、digdag-plugin-exampleをビルドしてみると下記のようなエラーになります。
$ ./gradlew publish :generatePomFileForMavenJavaPublication :compileJava FAILURE: Build failed with an exception. * What went wrong: Could not resolve all dependencies for configuration ':provided'. > Could not resolve io.digdag:digdag-spi:0.8.3. Required by: io.digdag.plugin:digdag-plugin-example:0.1.0 > Could not resolve io.digdag:digdag-spi:0.8.3. > Could not get resource 's3://digdag-beta-release/maven/io/digdag/digdag-spi/0.8.3/digdag-spi-0.8.3.pom'. > The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: A915771EEF478CC7) > Could not resolve io.digdag:digdag-plugin-utils:0.8.3. Required by: io.digdag.plugin:digdag-plugin-example:0.1.0 > Could not resolve io.digdag:digdag-plugin-utils:0.8.3. > Could not get resource 's3://digdag-beta-release/maven/io/digdag/digdag-plugin-utils/0.8.3/digdag-plugin-utils-0.8.3.pom'. > The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: C3D5C4A0D8D6F3FD) * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 13.331 secs
これはrepositoriesのmavenでAWSのS3を指定しているために発生しています。
これはS3へのアクセスができる人はそのまま使用することができるかもしれませんが、アクセス出来ない人はbuild.gradleでrepositoriesのmavenの指定を削除する必要があります。
削除した状態でビルドすると次のようなエラーが発生します。
$ ./gradlew publish :generatePomFileForMavenJavaPublication :compileJava FAILURE: Build failed with an exception. * What went wrong: Could not resolve all dependencies for configuration ':provided'. > Could not find io.digdag:digdag-spi:0.8.2. Searched in the following locations: file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.pom file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.jar https://repo1.maven.org/maven2/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.pom https://repo1.maven.org/maven2/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.jar https://jcenter.bintray.com/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.pom https://jcenter.bintray.com/io/digdag/digdag-spi/0.8.2/digdag-spi-0.8.2.jar Required by: io.digdag.plugin:digdag-plugin-example:0.1.0 > Could not find io.digdag:digdag-plugin-utils:0.8.2. Searched in the following locations: file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.pom file:/Users/muchiki0226/.m2/repository/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.jar https://repo1.maven.org/maven2/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.pom https://repo1.maven.org/maven2/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.jar https://jcenter.bintray.com/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.pom https://jcenter.bintray.com/io/digdag/digdag-plugin-utils/0.8.2/digdag-plugin-utils-0.8.2.jar Required by: io.digdag.plugin:digdag-plugin-example:0.1.0 * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 12.078 secs
DigDagプロジェクト内のモジュールの読み込みが行われていないため、特定のモジュールが見つからないためにこのエラーが問題が発生しています。
そこでDigDag自体のプジェクトをダウンロードしてきてビルドします。
$ git clone git@github.com:treasure-data/digdag.git $ cd digdag $ ./gradlew build
ビルドできたら次の2ファイルをコピーします。
- ./digdag-spi/build/libs/digdag-spi-0.8.4-SNAPSHOT.jar
- ./digdag-plugin-utils/build/libs/digdag-plugin-utils-0.8.4-SNAPSHOT.jar
この2ファイルをdigdag-plugin-exampleの中にlibsというフォルダを作成し、その中にコピーします。
これをビルド時に一緒にビルドすることでモジュールの問題は解決されますが、これをビルドしても必要なライブラリが含まれていないためエラーになります。
これらのエラーをすべて解消したbuild.gradleは次のようになります。
apply plugin: 'java' apply plugin: 'maven' apply plugin: 'maven-publish' group = 'io.digdag.plugin' version = '0.1.0' def digdagVersion = '0.8.2' repositories { mavenLocal() mavenCentral() jcenter() // maven { // url 's3://digdag-beta-release/maven' // credentials(AwsCredentials) { // accessKey "${System.env.AWS_ACCESS_KEY_ID}" // secretKey "${System.env.AWS_SECRET_ACCESS_KEY}" // } // } } //configurations { // provided //} dependencies { compile files('./lib/digdag-plugin-utils-0.8.4-SNAPSHOT.jar') compile files('./lib/digdag-spi-0.8.4-SNAPSHOT.jar') compile files('./lib/digdag-client-0.8.4-SNAPSHOT.jar') compile 'javax.inject:javax.inject:1' compile group: 'com.google.guava', name: 'guava', version: 'r05' compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1' //provided 'io.digdag:digdag-spi:' + digdagVersion //provided 'io.digdag:digdag-plugin-utils:' + digdagVersion // this should be 'compile' once digdag 0.8.2 is released to a built-in repository } //sourceSets { // main { // compileClasspath += configurations.provided // test.compileClasspath += configurations.provided // test.runtimeClasspath += configurations.provided // } //} publishing { publications { mavenJava(MavenPublication) { from components.java } } repositories { maven { url "$buildDir/repo" } } } sourceCompatibility = 1.8 targetCompatibility = 1.8 compileJava.options.encoding = 'UTF-8' compileTestJava.options.encoding = 'UTF-8' javadoc { options { locale = 'en_US' encoding = 'UTF-8' } }
追加でguavaとjacksonを入れ、これでビルドすると./digdag-plugin-example/build/repo/io/digdag/plugin/digdag-plugin-example/0.1.0内にdigdag-plugin-example-0.1.0.jarが生成されます。
これで準備が整いました。
プラグインの実行
このフォルダ内のplugin.digを見てみると次のようになっています。
_export: plugin: repositories: - file://${repository_path} dependencies: - io.digdag.plugin:digdag-plugin-example:0.1.0 +step1: example>: template.txt message: yes path: example.out
exportにpluginという項目を記載し、プラグインのリポジトリのパスとdependenciesでプラグインの指定を行っています。
dependenciesは「groupId:artifactId:version」の構成で記入を行います。
groupId、artifactId、versionはプラグインのbuild.gradleに記載されている内容になります。
これで次のコマンドを実行するとタスクが実行されます。
$ digdag r plugin.dig -p repository_path=/<digdag-plugin-exampleをおいているパス>/digdag-plugin-example/build/repo ⏎ 2016-07-03 14:31:59 +0900: Digdag v0.8.3 2016-07-03 14:32:00 +0900 [WARN] (main): Using a new session time 2016-07-03T00:00:00+00:00. 2016-07-03 14:32:00 +0900 [INFO] (main): Using session .digdag/status/20160703T000000+0000. 2016-07-03 14:32:00 +0900 [INFO] (main): Starting a new session project id=1 workflow name=plugin session_time=2016-07-03T00:00:00+00:00 2016-07-03 14:32:00 +0900 [INFO] (0019@+plugin+step1): example>: template.txt Success. Task state is saved at .digdag/status/20160703T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
上記のように表示されたら、example.outが作成され中に下記のような文字列書き込まれています。
[example.out]
Worked? yes
これはtemplete.txtが下記のようになっており、messageの部分がプラグインによってstep1に記載されているmessageの部分に差し替えられていることがわかります。
[templete.txt]
Worked? ${message}
Slackに投稿するプラグイン
これでプラグインの環境は整いました。
今回は下記のページのIncoming WebHooksを使ってSlackへ文字列を投稿します。
まずdigファイルは次のようにします。
[plugin.dig]
_export: plugin: repositories: - file://${repository_path} dependencies: - io.digdag.plugin:digdag-plugin-example:0.1.0 +step1: slack>: message.txt webhook: https://hooks.slack.com/services/XXXXXXX/XXXX/XXXXXX channel: general username: webhookbot icon_emoji: ghost
slackコマンドでmessage.txtを読み込んで、webhook、channel、username、icon_emojiのそれぞれの情報を実行時に読み込んでSlackへ投稿するようにします。
プラグ新プロジェクト内のOperatorFactoryをimplementsしたSlackOperatorFactoryを次のように作成しました。
[SlackOperatorFactory.java]
public class SlackOperatorFactory implements OperatorFactory { private final TemplateEngine templateEngine; public SlackOperatorFactory(TemplateEngine templateEngine) { this.templateEngine = templateEngine; } public String getType() { return "slack"; } @Override public Operator newTaskExecutor(Path workspacePath, TaskRequest request) { return new ExampleOperator(workspacePath, request); } private class ExampleOperator extends BaseOperator { public ExampleOperator(Path workspacePath, TaskRequest request) { super(workspacePath, request); } @Override public TaskResult runTask() { Config params = request.getConfig().mergeDefault( request.getConfig().getNestedOrGetEmpty("slack")); String message = templateEngine.templateCommand(workspacePath, params, null, UTF_8); String url = params.get("webhook", String.class); String channel = "#" + params.get("channel", String.class); String username = params.get("username", String.class); String icon_emoji = ":" + params.get("icon_emoji", String.class) + ":"; System.out.println(url); System.out.println(message); System.out.println(channel); System.out.println(username); System.out.println(icon_emoji); Gson gson = new Gson(); String payload = gson.toJson(new SlackInfo(channel,username,message,icon_emoji)); System.out.println(payload); try { HttpResponse<String> result = Unirest.post(url) .header("Content-Type", "application/x-www-form-urlencoded") .field("payload", payload).asString(); } catch (UnirestException e) { e.printStackTrace(); } return TaskResult.empty(request); } } }
ここでの特徴はgetTypeでプラグインの「slack>」の>の前の部分を指定します。
次にタスクが実行された際に呼び出されるrunTaskを実装します。
runTask内でConfigクラスのparamsでタスク実行時に付与している情報を収集しUnirest.postでSlackへポストしています。
これ以外に次のファイルを作成しました。 基本的には名前を適切に変更したのみです。
[SlackPlugin.java]
public class SlackPlugin implements Plugin { @Override public <T> Class<? extends T> getServiceProvider(Class<T> type) { if (type == OperatorProvider.class) { return SlackOperatorProvider.class.asSubclass(type); } else { return null; } } public static class SlackOperatorProvider implements OperatorProvider { @Inject protected TemplateEngine templateEngine; @Override public List<OperatorFactory> get() { return Arrays.asList( new SlackOperatorFactory(templateEngine)); } } }
[SlackInfo.java]
public class SlackInfo { private String channel; private String username; private String text; private String icon_emoji; public SlackInfo(String channel, String username, String text, String icon_emoji) { this.channel = channel; this.username = username; this.text = text; this.icon_emoji = icon_emoji; } }
[resources/META-INF/services/io.digdag.spi.Plugin]
io.digdag.plugin.example.SlackPlugin
これで完成しました。 最後にmessage.txtに「digidagから投稿したよ」という文字列を書いていた場合は実行すると次の様になります。
[DigDagの実行]
$ digdag r plugin.dig -p repository_path=/<digdag-plugin-exampleをおいているパス>/digdag-plugin-example/build/repo 2016-07-03 21:05:19 +0900: Digdag v0.8.3 2016-07-03 21:05:20 +0900 [WARN] (main): Using a new session time 2016-07-03T00:00:00+00:00. 2016-07-03 21:05:20 +0900 [INFO] (main): Using session .digdag/status/20160703T000000+0000. 2016-07-03 21:05:20 +0900 [INFO] (main): Starting a new session project id=1 workflow name=plugin session_time=2016-07-03T00:00:00+00:00 2016-07-03 21:05:20 +0900 [INFO] (0019@+plugin+step1): slack>: message.txt https://hooks.slack.com/services/XXXXXX/XXXXXXX/XXXXXXXX digidagから投稿したよ #general[f:id:muchiki0226:20160703210833p:plain] webhookbot :ghost: {"channel":"#general","username":"webhookbot","text":"digidagから投稿したよ","icon_emoji":":ghost:"} Success. Task state is saved at .digdag/status/20160703T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
[Slackの結果]
実際の動くコードは下記を参照してください。