Transgate という Node.js 製エージェントベースのタスクフローフレームワーク

Transgate というNode.js製のエージェントベースのタスクフロフレームワークを作った。

どうして作ったのか?

自宅の家電を操作するためのプログラムを書いていたら、色々なフローがごちゃごちゃになったから。 Dysonのファンから定期的に温度湿度を取得してデータベースに保存したり、Google Home/Assistant + IFTTT から来るメッセージを処理してIRKit を操作する。そのうち温度に従って自動的に IRKit 経由でエアコンを操作したくもなった、さてどう書こうかと?

どんなもの?

突然だけど空港などの荷物の仕分けをイメージしてください。エージェントは、ゲートから出てくるアイテムを受け取り、処理して別のゲートに送る。ゲートの向こう側がどうなっているかは、エージェントは何も知らない。エージェントは空のアイテムを来たら作業を終える。アーキテクチャのイメージはこんな感じです。

エージェントはゲートからアイテムを受け取ることと、別のゲートに新たにアイテムを送ることができる。アイテムはシンプルなオブジェクトだ。エージェントは自身のタスクに集中できる。だから前工程や次工程が増えても減ってもアイテムの構成が変わらなければ問題なく動く。そして入出力がシンプルなためユニットテストも簡単にかける。エージェントはゲートの実体を知らないので、入力元ゲートをスタブに、出力先ゲートをモックに、簡単に置き換えられる。

フレームワークに出てくる概念のまとめ

  • ゲート(Gate)はファイルストレージやデータベース、キュー、APIサービスといった入出力のエンドポイントです。
  • エージェント(Agent)はゲート間でアイテムを処理するタスクワーカーです。ゲートが何に通じているかは関知しません。
  • アイテム(Item)はゲート間を流れるシンプルなオブジェクトでエージェントが処理する対象です。

使用例

今回のフレームワークを作るきっかけになったホームコントロールプログラムを通じて説明してみます。 ちなみにこのプログラムは靴箱の中の Raspberry PI 上でデーモンとして動いています。

構成図

f:id:tilfin:20171123144501p:plain

メインプログラム (main.js)

const {
  Agent,
  HttpClientGate,
  HttpServerGate,
  IntervalGate,
  JointGate,
  StdoutGate,
  duplicator,
  mixer,
} = require('transgate');

const pino = require('pino')();
const config = require('konfig-yaml')();

const MongoGate = require('./lib/mongo_gate');
const IRKitGate = require('./lib/irkit_gate');

// Agent
const AnalysisCommander = require('./lib/analysis_commander');
const DysonCoolLinkRecorder = require('./lib/dyson/cool_link_recorder');
const EnvironmentalAnalyzer = require('./lib/environmental_analyzer');

// Gate
const slackGate = new HttpClientGate({ endpoint: config.slack.webhook_url });
const iftttGate = new HttpServerGate({ port: config.port });
const irkitGate = new IRKitGate(config.irkit.endpoint);
const intervalGate = new IntervalGate(60);
const mongoGate = new MongoGate(config.mongodb.endpoint, config.mongodb.collection);
const drToEaGate = new JointGate();

(async () => {
  try {
    await Agent.all(
      new AnalysisCommander(iftttGate, { irkitGate, slackGate }),
      new DysonCoolLinkRecorder(intervalGate, duplicator(mongoGate, drToEaGate)),
      new EnvironmentalAnalyzer(drToEaGate, { irkitGate, slackGate }),
    );
  } catch(err) {
    pino.error(err);  
    await iftttGate.close();
    await mongoGate.close();
  }

  intervalGate.clear();
})()
.catch(err => {
  pino.error(err);
});

7つのゲート

  • slackGate は slack にテキストメッセージをポストします。HttpClientGate のインスタンスで、アイテムとなるJSON{ "text": "<text message>" } です。
  • iftttGate は IFTTT の webhook から受け取った JSON をアイテムとして利用します。アイテムとなるJSON{ "target": "TV", "text": "<speaking words>" } です。
  • irkitGate はHTTPインターフェイスを備える赤外線送信器に命令します。アイテムとなるJSON{ "command": "celling_light_off" } です。
  • intervalGate は一定の間隔でアイテムを生成します。アイテムは { "time": <Date instance> } です。この場合は 1 分おきにエージェントの処理を走らせます。
  • mongoGate は MongoDB の指定のコレクションに送信されたアイテムを登録します。
  • drToEaGate は後述の DysonCoolLinkRecorder から EnvironmentalAnalyzer にアイテムの流すジョイントです。

3つのエージェント

  • AnalysisCommander は IFTTT の webhook から来た JSON をアイテムとして受け取り、操作対象とテキストから IRKit に対して送信すべき赤外線信号を指定します。slack には文言が解釈できなかったときにポストします。
  • DysonCoolLinkRecorder は Dyson PureCoolLink ファンから1分おきに温度と湿度を取得して、duplicator という複製機を挟んで MongoDB への書き込みとジョイントとなるゲートに送ります。
  • EnvironmentalAnalyzer はそのジョイントを通じて来た温度から閾値を超えていたらエアコンの操作を IRKit に要求します。自動的に操作をしたときは slack に記録します。

エージェントの実装

Agentのサブクラスを作ります。main メソッドで受け取ったアイテムを処理して指定先のゲートに新たなアイテムを送る処理を書きます。before/after のフックメソッドを使って、初期化処理や別に利用するプロセス(例えば headless chrome) をここで制御(起動・停止)します。

下記は EnvironmentalAnalyzer の実装例でです。室温が摂氏17度以下になったらエアコンをオンにします。

const { Agent } = require('transgate');

module.exports = 
class EnvironmentalAnalyzer extends Agent {
  async before() {
    this._preTemp = null;
    this._airconAlive = false;
  }

  async main(item, { irkitGate, slackGate }) {
    const curTemp = item.temp;

    if (this._preTemp && this._preTemp > 17 && curTemp <= 17) {
      if (!this._airconAlive) {
        await irkitGate.sendAll({ command: 'aircon_on' });
        this._airconAlive = true;
        await slackGate.send({ text: `Turn on aircon because temp is down to ${curTemp}` });          
      }
    }

    this._preTemp = curTemp;
  }
}

コンストラクタとアイテムの入力元ゲートが隠蔽されているのは、 null を受け取ると次のゲートに送り、自身は終了するという仕様の実装を意識させないためです。

特徴のまとめ

  • 複雑なデーモンやバッチプログラムに向いている。
  • 同じエージェントを並列で動かすようなことは想定していないので、大量に捌く処理には向いてない。
  • メインプログラムで登場するゲートとエージェントと、アイテムのタスクフローが定義できる。そのためこれだけで全体が把握できる。
  • エージェントの処理は async/await で擬似的に同期に書けつつ、エージェントが多くても Node.js なのでスレッドベースのように重くならない。
  • ゲートの置き換えが容易なので、エージェントのユニットテストが書きやすく、部分的な実行の確認もしやすい。

予想される疑問と答え

参照先サービスは全部ゲートになるのか? 

Noです。ゲート間は一方通行に限定されます。エージェントはその先を知らない。つまりリクエストを投げて、それに対するレスポンスを得ることはできません。往復ではなく、ループにすることは可能ですが、ステートレスなのでどの送り出したアイテム(リクエスト)に対してのレスポンスかはわからないのです。ゲートは、エージェントにとってトリガーとなるものを出す部分と成果を送る部分になります。

一連のフローが終わったら時にキッカーに完了を通知するには?

キューシステムはタスクが完了したら完了通知を送る必要が往往にあります。こういった場合は、アイテムにそのコンテキストを持たせてフローに流して、最後のゲートが完了通知を送る役割を担うようにします。

ロガーはゲートにすべきか?

ログがアウトプットそのものになるならゲートにすべきです。そうすれば後からゲートをさらに Agent にジョイントするものに置き換えて、そこからログ解析サービスに投げるといった修正も容易にできます。

ゲートにどこまでロジックを含めていいのか?

ゲートはできる限りシンプルな方が良いです。エージェントはテストしやすいように設計しますが、ゲートそのものにロジックを入れてしまうと入出力先を付け替えてテストできなくなります。ただプロジェクト共通のロジックでそれがフォーマット程度であれば、ゲートに実装してもいいでしょう。複雑ならばそれ用のエージェントを作ってゲートの前に置き、ジョイントで繋げるだけです。

Transgate に興味を持っていただけたら幸いです。

English version Transgate is Agent-based taskflow framework for Node.js - DEV Community 👩‍💻👨‍💻