MuleSoft 入門(11)ランタイムログの自動アーカイブ(2/2)

December 19, 2019 Minoru Nakanishi

こんにちは、アピリオ中西です。MuleSoft入門(11)ランタイムログの自動アーカイブ(1/2)に引き続き、ランタイムログの自動アーカイブ機能をもつMuleアプリケーションの実装をご紹介します。前回は Platform API を利用してログを取得するまでの流れをお話ししました。

最低限の運用であれば、この時点でのログ情報を外部ストレージにアップロードするだけでも十分かもしれません。しかし、ログの全量を日々アーカイブするのではストレージの利用効率が悪く、トラブルシューティング時にも問題のログの特定が難しくなってしまいます。そこで、今回の記事では、前日の処理で取得したログとの差分のみを出力できるような仕組みについて、詳しくお話しします。

前回の記事でご説明した処理の概要を再掲します。

(1) Anypoint Platform のユーザ名/パスワード認証を用いてアクセストークンを取得
(2) 指定した環境下の Mule アプリケーションを一括取得
(3) Mule アプリケーションのデプロイ情報を取得
(4) 各インスタンス(Worker)のログファイルを取得
(5) 前回実行時のログとの差分のみを取得
(6) Box API を利用してログファイルをアップロードする
(7) (3)~(6)の処理を、取得した Mule アプリケーション分繰り返し処理を行う

手順(5)からが本記事の内容となります。順を追ってご説明します。

 

(5) 前回実行時のログとの差分のみを取得

こちらが実装の要の部分になります。処理の要点は大きく分けて

 a. Watermark の実装

 b. ログの差分を取得するロジックの実装

の2点があります。

a. Watermark の実装

[ログ自動アーカイブのロジック]

差分の取得ロジックは、上記画像の "extractDiffs " Flow Reference コンポーネントに定義しています。ここでは、「Watermark」という概念が登場します。Watermarkとは、同期処理などを行う際に保存され、次回の同期で取得し比較されるタイムスタンプのことです。​「洪水時の水位を防波堤の壁に記録することで、高さの更新を判断すること」に由来し、前回との差分の有無を確認するために利用されます。

今回は日次でログを取得するため、前日とのログの差分のみをアーカイブする方が出力ファイルのデータ容量の削減やトラブルシューティングの際の調査の容易さに影響するため、Watermark の実装を行いました。最終行のログのタイムスタンプを比較することで、差分の有無を確認できます。

[ログの差分取得ロジック]

Watermark の実装には ObjectStore という、Key-Value 形式でデータを保持する仕組みを利用します。ランタイムログはインスタンス単位で保持されているため、インスタンス(Worker)の ID を一意な Key とし、Value にはログの最終行のタイムスタンプを格納するようにしました。

まず ObjectStore の "Retrieve" オペレーションで、Keyに紐づくタイムスタンプを取得します。この時、初回実行時の場合はKeyが保存されていないので "OS:KEY_NOT_FOUND" が返却されます。Try-catch 内のエラーハンドラでタイムスタンプの初回保存処理を行い、差分を保持するための変数には全ログデータを格納します。

2回目以降の実行では、ObjectStore に保存したタイムスタンプが取得されるようになるので、最終行のタイムスタンプと同じであれば「差分なし」、それ以外は「差分あり」として扱い、最新のタイムスタンプを "Store" オペレーションで上書きます。

b. ログの差分を取得するロジックの実装

前回の記事の終わりで解説しましたように、この時点で取得しているログ情報は次の通りです。

[2015-12-14 17:52:33.412] INFO    org.mule.module.launcher.application.DefaultMuleApplication [qtp18605202-31]: (t:null) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + Initializing app 'test-test'                             + ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
[2015-12-14 17:52:34.065] INFO    com.mulesoft.ch.monitoring.MonitoringCoreExtension [qtp18605202-31]: (t:null) Monitoring enabled: true 
[2015-12-14 17:52:34.066] INFO    com.mulesoft.ch.monitoring.MonitoringCoreExtension [qtp18605202-31]: (t:null) Registering ping flow injector... 
[2015-12-14 17:52:34.083] INFO    com.mulesoft.ch.queue.boot.PersistentQueueCoreExtension [qtp18605202-31]: (t:null) The PersistentQueueManager is NOT configured. The normal VM queue manager will be used

(以下略)
...

このログ情報自体はただの String なので、扱いやすい形にするためにログの各行を要素にもつ配列に変換します。下記 DataWeave スクリプトを利用し、変換した配列を変数に格納しておきます。

vars.extractedLogs as String splitBy(/\n/) default[]

Anypoint Platform はクラウドアプリケーションのため、内部的にはタイムスタンプは UTC で保持されています。今回実装したスケジューラは CloudHub 上で動作するため、そのままログを取得すると全て UTC のタイムスタンプで表記されてしまいます。日本で運用する場合は JST への変換が必須なので、実装においても

・Watermark として利用する 「ログの最終行のタイムスタンプ」

・最終的に外部ストレージにアップロードするログファイルの「各行のタイムスタンプ」

の2つを UTC から JST に変換する必要があります。Mule アプリケーションの開発時には DataWeave を利用してデータの変換を行うと 第4回 の記事でご紹介しましたが、Mule アプリケーションは Java ファイルにロジックを定義して利用することもできます( src/main/java パッケージ以下に、通常の Java プロジェクト同様にクラスを作成します)。DataWeave で利用する場合は次のように記述します。

%dw 2.0 
output application/json 
import java!com::appirio::muleapp::util::CloudHubLogUtils // Javaファイルのインポート。classのパッケージを指定。
---
CloudHubLogUtils::getLatestLoggedTimeStr(vars.logLines) as String
// CloudHubLogUtils.java に定義したメソッド (抜粋)

public static String getLatestLoggedTimeStr(String[] logLines) { 
 return Arrays.asList(logLines).stream() 
      .filter(s -> StringUtils.isNotBlank(s) && isCloudHubLogPrefix(s))
      .map(s -> convert2JstLogLine(s)) 
      .map(s -> getCloudHubLoggedTimeStr(s)) 
      .reduce((first, second) -> second)
      .orElse(StringUtils.EMPTY); 
}

上記例では、ログの最終行のタイムスタンプ(JST)を取得するロジックを Java 8 の Stream API を利用して実装しています。このような複雑な処理を DataWeave で頑張って実装しても、結果的に可読性が悪くなりかねません。しかし、Java の方が得意だからといって全てを Java で実装してしまっては生産性の向上には繋がりません。DataWeave で実装する範囲と Java で実装する範囲の線引きの定義は明確には定まっていないので、今後の課題の一つでもあります。

少し話が逸れましたが、ログの最終行のタイムスタンプを取得するには 

[2015-12-14 17:52:34.083] INFO...

上記のようにタイムスタンプで始まる行を対象とする必要があります。Java の実装内容を簡単に説明しますと、

 1. 「文字列が "[" で始まる かつ その次の4文字が数字」の行に絞り込む

 2. ログのタイムスタンプ部分をを JST に変換する

(タイムスタンプ部分を文字列として切り出し、JST に変換した上で replace メソッドで置換する)

(参考) LocalDateTimeのZone変更

private static final ZoneId JST = ZoneId.of("Asia/Tokyo"); 
private static final ZoneId UTC = ZoneId.of("UTC");

private static LocalDateTime parseToJST(LocalDateTime targetTime) { 
  return targetTime.atZone(UTC).withZoneSameInstant(JST).toLocalDateTime(); 
}

 3. タイムスタンプの文字列のみに変換する

 4. 最後の要素を取得する

という構成にしています。この取得した LocalDateTime の文字列を Watermark として利用します。

ログの差分の取得に関しても、同様に Java でロジックを記述しています。詳しくは gitHub をご覧ください。

(7) Box API を利用してログファイルをアップロードする

Box API を利用する場合は、本記事では解説を割愛しますが事前にBoxとのJWT認証が必要なので リンク先 の手順をご確認ください。次の「ファイルのアップロード」API を利用します。 curl のサンプルコードは以下の通りです。

curl https://upload.box.com/api/2.0/files/content \ 
-H "Authorization: Bearer ACCESS_TOKEN" -X POST \ 
-F attributes='{"name":"tigers.jpeg", "parent":{"id":"11446498"}}' \ 
-F file=@myfile.jpg

ファイルのアップロード時には「ファイル情報」と「ファイルのアップロード先」という2つの情報を含める場合が多いと思います。その場合は DataWeave の MIME Type に multipart/form-data を指定します。今回の実装における例を以下に示します。

%dw 2.0 
output multipart/form-data 
var fileName = vars.instanceId as String ++ ".txt" 
--- 
{
 "parts": {
   "file": { 
     "headers": { 
       "Content-Disposition": { 
         "name": "file", 
         "filename": fileName 
       }, 
       "Content-Type": "application/octet-stream" 
     }, 
     "content": vars.logDifference as String 
   }, 
   "attributes": { 
     "headers": { 
       "Content-Type": "application/json" 
     }, 
     "content": { 
       "name": fileName, 
       "parent": { 
         "id": vars.domainFolderId as String 
       } 
     } 
   } 
 } 
}

このように、"parts" 以下にフィールドを複数指定し、それぞれに "headers" および "content" を指定することで複数の情報を定義できます。なお、ファイルなどの Binary 情報を扱う際は以下の Content-Type を指定します。

"Content-Type": "application/octet-stream"

なお、 MuleSoft 公式ガイドの DataWeave リファレンスにも Multipart が定義されていますが、実際は関数を利用しても上記例のように直接記載してもコード量はさほど変わりません。個人的には、一目で構造が把握できる範囲の情報量であれば、直接定義する方がわかりやすいのではと感じました。

(8) (3)~(7)の処理を、取得した Mule アプリケーション分繰り返し処理を行う

「(2) 指定した環境下の Mule アプリケーションを一括取得」 の手順で取得したドメインの数だけ、これらの手順を繰り返し実行します。今回の実装では、For Each コンポーネントを利用した繰り返し処理が複数箇所にあります。一般的にはどのプログラミング言語においても for ループ文の多重化は避けるべきですし、Mule アプリケーションにおいても例外ではないと思います。今回は止むを得ず多重ループを利用しています。

以下の画像のように、box にログが蓄積されます。

[box のログの出力結果]

 

いかがでしたでしょうか。本記事で紹介できたのは一部のコードのみでしたので、Mule アプリケーション全体を確認したい方は GitHubのプロジェクトをご参照ください。PoC としての実装のため、例外処理などは省略していますがご参考になればと思います。

本入門シリーズでは、MuleSoft の学習を始めたばかりの方を対象に Anypoint Platform の基本、Mule アプリケーションの開発手順、テスト方法、簡単な応用テクニックといった内容を北嵐と中西で全11回に渡ってお届けしました。MuleSoft はシステムの裏側で動くサービスであるため、その特徴をイメージしづらい所があるので、できるだけ具体的な内容を紹介できるように心掛けました。このブログが日本での MuleSoft の導入と展開に少しでもお役に立てれば幸いです。 

アピリオの MuleSoft チームでは、今後も MuleSoft の実践的なテクニックをこのテックブログ上でご紹介したいと思っていますので、ご期待ください。

 

[MuleSoft  入門シリーズ]

MuleSoft 入門(1)MuleSoft による API-led なアプリケーションネットワーク

MuleSoft 入門(2)ようこそ Anypoint Platform へ

MuleSoft 入門(3)Anypoint Studio による API の実装

MuleSoft 入門(4)DataWeave によるデータ変換

MuleSoft 入門(5)Flow Designer で Mule アプリケーションを開発

MuleSoft 入門(6)MUnit による単体テスト

MuleSoft 入門(7)API 開発におけるテストの考え方

MuleSoft 入門(8)ポリシーと SLA

MuleSoft 入門(9)カスタムポリシーの開発

MuleSoft 入門(10)Mule API のバージョン管理

MuleSoft 入門(11)ランタイムログの自動アーカイブ

著者について

2018年にアピリオに入社し、Salesforce を中心としたクラウドの世界に足を踏み入れました。2019年には MuleSoft のプロジェクトに携わり、API 主導の開発に強い興味と関心を持っています。IT 技術の変遷が激しい今だからこそ、勉強に励んで自分の価値を高めようと絶賛奮闘中です。

Minoru Nakanishi のコンテンツをもっと見る

戻れません

次へ
Apex 知識ゼロの Salesforce 初心者がバッチ開発担当となったが完成できた話
Apex 知識ゼロの Salesforce 初心者がバッチ開発担当となったが完成できた話

この6ヶ月の間で色濃く残る思い出といえば、なんといっても楽しいバッチ開発ライフを送れたことです。Java の知識を持つ私がどのようなステップを踏んで、Apex でバッチを書き上げたのかをご紹介します。

アピリオまでお気軽にお問合せください

ご質問はこちら