Blogical

AWS/Salesforceを中心に様々な情報を配信していきます(/・ω・)/

IoT SiteWise を使用してデータを取り込んでみた

こんにちは、ロジカル・アーツの井川です。

しばらく前に、PoC で IoT SiteWise を使用して工場の設備データをクラウドにアップするということをしました。当時は東京リージョンは対応していなかったのですが、それから色々変化があったので紹介したいと思います。

はじめに

PoC では Greengrass V1 + SiteWise の構成で設備データ取得のシステムを構築しましたが、当時 SiteWise は Greengrass V2 に対応しておらず、また東京リージョンでは使えませんでした。それからしばらくして東京リージョンでも SiteWise が利用できるようになりましたので、検証と備忘録を兼ねて、SiteWise を使用したデータの取り込みを記事にしました。(なお、東京リージョンでは Greengrass V1 をプラットフォームに選択できません*1。) 詳細はのちに述べますが、今回は Greengrass V2 + SiteWise の構成でデータを取得してみます(以下の構成図参照)。また、取得するデータは、OPC UA サーバが動作しているインスタンスの CPU 使用率(%)とメモリ使用量(MB)とします。

IoT Greengrass V2 とは

AWS IoT Greengrassは、IoT アプリケーションをデバイス上で構築、デプロイ、管理するのに役立つ、オープンソースのモノのインターネット (IoT) エッジランタイムおよびクラウドサービスです。

引用:https://docs.aws.amazon.com/ja_jp/greengrass/v2/developerguide/what-is-iot-greengrass.html

Greengrass に関する詳しい説明はここでは割愛します。エッジデバイス(ローカル)で動作するという点では V1 も V2 も変わりませんが、主要な概念は大きく異なっています。ここでは、(エッジ側での)SiteWise の動作環境としての役割があるくらいの理解で十分です。

IoT SiteWise とは

AWS IoT SiteWiseは、産業機器からデータを大規模に収集、モデル化、分析、視覚化できるマネージドサービスです。

引用:https://docs.aws.amazon.com/ja_jp/iot-sitewise/latest/userguide/what-is-sitewise.html

この記事ではデータ収集機能に焦点を当てて解説していきたいと思います。

SiteWise にデータを取り込むにはいくつか方法がありますが、今回はゲートウェイを使用します。ゲートウェイの他には次のものがあります。

  • IoT Core ルールを使用する
  • IoT Events アクションを使用する
  • Greengrass ストリームマネージャーを使用する
  • SiteWise API を使用する

ゲートウェイは Greengrass のコネクタ(V1)またはコンポーネント(V2)として動作します。Greengrass V2 の場合、データ収集プロトコルとしては OPC UA のみの対応となります*2

今回は Pythonオープンソースライブラリ asyncua を用いて OPC UA サーバを用意します。

まず、SiteWise の主要な概念から抑えましょう。

2022/11/25 追記
別記事で、この記事の設定を流用してデータソース認証の説明をしたかったのですが 、asyncua だと実現が難しそうだったので、Node.js のオープンソースライブラリ node-opcua を使用する手順も用意しました。
データソース認証も試されたい場合は、お手数ですが Appendix: node-opcua を使用した OPC UA サーバでの手順を参照して本文の一部を読み替えるようお願いします。

ゲートウェイ

一般に、工場内のサーバといった、(クラウドと対比して)産業機器に近いコンピューティング環境で動作し、データの収集、処理などを行います。(いわゆるエッジコンピューティング。)

アセットモデル

後述のアセットの構造を抽象化したものです。同じアセットモデルからは同じ構造を持ったアセットが作成されます。

アセットモデルでは、属性(静的な値)、測定(時系列データ)、変換、メトリクス、アセットの階層構造を定義できます。

アセット

産業機器データのクラウドでの格納先です。デバイス番号のような静的な値、温度のような時系列データなどを持ちます。

アセットプロパティ

各アセット内の構造のことです。各プロパティはデータ型と単位を持ちます。データ型は文字列、整数、ダブル、ブールの 4 種類です。また、プロパティは、属性、測定、変換、メトリクスの 4 種類です。

OPC UA とは

OPC は、産業オートメーション分野やその他業界における、安全で信頼性あるデータ交換を目的とした相互運用を行うための標準規格です。プラットフォームから独立し、多くの製造ベンダーのデバイス間で、シームレスな情報の流れを確保します。

引用:https://jp.opcfoundation.org/about/what-is-opc/

OPC UA を介することで、異なるベンダーのデバイスとでも統一的な通信が可能になります。産業機器自体が OPC UA に対応していてなくても、その機器と通信できる OPC UA サーバがあれば、SiteWise はサーバを介してデータを取り込むことができます。

この記事では実際の産業機器の代わりに、EC2 インスタンス自身を産業機器に見立てています。(加えて、Greengrass のプラットフォームとしての役割も持っています。)EC2 インスタンスの作成手順自体は省略しますが、特に次の点に注意して作成してください。また、インスタンスのメモリが 1GB だと OOM Killer が発生して OPC UA サーバが起動しない場合があるので、2GB 以上あるインスタンスタイプを選択することをお勧めします。その他の要件についてはこちらを参照してください。

  • サポートされている OS
OS アーキテクチャ
Ubuntu 20.04 または 18.04 x86_64 (AMD64)
Red Hat Enterprise Linux (RHEL) 8 x86_64 (AMD64)
Amazon Linux 2 x86_64 (AMD64)

なお、以下の検証では Amazon Linux 2 を使用しました。

手順

ゲートウェイの作成

SiteWise のコンソール画面に行き、左ペインの「ゲートウェイ」を選択して「ゲートウェイを作成」をクリックします。

以下の内容を入力・選択し、「次へ」をクリックします。

  1. ゲートウェイ」に「DemoGateway」と入力
  2. デフォルトセットアップ」を選択
  3. コアデバイス」に「DemoGatewayGreengrassCoreDevice」と入力

そのまま「次へ」をクリックします。

ここもデフォルトのまま、「次へ」をクリックします。

データソースは後で設定するので、ここでは「次へ」をクリックします。

最後の「ゲートウェイバイス OS」がデプロイする OS になっているかを確認して「生成」をクリックします。このとき、シェルスクリプトファイルがダウンロードされます。このスクリプトファイルはGreengrass V2 と SiteWise ゲートウェイのセットアップで使用します。

アセットモデルの作成

左ペインの「モデル」を選択し、「モデルの作成」をクリックします。

モデルの詳細」の「名前」に「DemoModel」と入力します。

冒頭に述べたように CPU 使用率(%)とメモリ使用量(MB)を取得するため、「測定の定義」に以下の情報を入力・選択します。

名前 単位 データ型
CPU % ダブル型
Memory MB ダブル型

その他はそのままにして、「モデルの作成」をクリックします。

アセットの作成

左ペインの「アセット」を選択し、「アセットの作成」をクリックします。

以下の内容を入力・選択し、「アセットの作成」をクリックします。

  1. モデル情報」の「モデル」で先程作成した「DemoModel」を選択
  2. アセットの情報」の「名前」に「DemoAsset」と入力

Greengrass V2 と SiteWise ゲートウェイのセットアップ

EC2 インスタンスゲートウェイの作成でダウンロードされたスクリプトscp コマンドなどで転送します。ファイル名は <ゲートウェイの名前>.deploy.sh のようになっていると思います。

転送できたら EC2 に接続し、スクリプトを実行します*3

chmod +x DemoGateway.deploy.sh
sudo ./DemoGateway.deploy.sh -y

しばらく待つとセットアップが終わっているかと思います。最後に Successfully set up Nucleus as a system service と表示されればうまくいってます。

OPC UA サーバの準備

はじめにで述べたように、OS の CPU 使用率(%)とメモリ使用量(MB)を取得します。このとき SiteWise 側で、取得するデータに対応するアセットプロパティにエイリアスを設定する必要があります。このエイリアスは OPC UAObjects ノードを起点とした変数ノードへのパスを、/ で始めたものになります。

以下の図では、下記の Python スクリプトで定義している OPC UA サーバのアドレス空間の構造と、SiteWise のアセットプロパティエイリアスとの対応関係を表しています。

スクリプトの作成

以下のコマンドで必要なライブラリをインストールします。psutil は OS の CPU 使用率とメモリ使用量を取得するために使用します。

python3 -m pip install asyncua psutil

以下の内容のコードを sample_server.py という名前で作成します。

import asyncio
import logging
import sys
import time

from asyncua import ua, Server
import psutil

stream_handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

async def set_cpu(ua_var):
    cpu = psutil.cpu_percent()
    await ua_var.write_value(cpu)
    logger.info(f"CPU Usage: {cpu}%")

async def set_memory(ua_var):
    scale = 1024**2
    mem = psutil.virtual_memory().used / scale
    await ua_var.write_value(mem)
    logger.info(f'Memory Used: {mem}MB')

async def main():

    server = Server()
    await server.init()
    server.set_endpoint('opc.tcp://localhost:4840/freeopcua/server')

    uri = 'http://python-opcua.example.com'
    idx = await server.register_namespace(uri)

    objects = server.get_objects_node()

    obj_sample = await objects.add_object(idx, 'SampleObject')
    var_cpu = await obj_sample.add_variable(idx, 'CPU', 0.0, ua.VariantType.Double)
    var_mem = await obj_sample.add_variable(idx, 'Memory', 0.0, ua.VariantType.Double)

    await server.start()

    logger.info('Starting OPC UA Server ...')

    try:
        start = time.time()
        while True:
            await asyncio.sleep(1)
            elasped_time = time.time() - start

            await asyncio.gather(
                set_cpu(var_cpu),
                set_memory(var_mem))

            logger.info(f'Elasped Time: {elasped_time}sec')

    finally:
        await server.stop()

        logger.info('Stopping OPC UA Server ...')

if __name__ == '__main__':
    asyncio.run(main())

作成したら SiteWise 側の設定を行いましょう。

データソースの設定

設定した OPC UA サーバの情報に基づいてデータソースの設定を行っていきます。

SiteWise のコンソール画面の左ペインで「ゲートウェイ」を選択し、作成したゲートウェイの名前をクリックします。

データソースを追加」をクリックします。

以下の内容を入力・選択し、「追加」をクリックします。

項目 備考
ソース名 demo-server
データストリームのプレフィックス - オプション デフォルトでは「ソース名」と同じ値が入りますが、今回は使用しないので空にします
ローカルエンドポイント opc.tcp://localhost:4840/freeopcua/server 上記のサーバのスクリプトで設定したエンドポイントに対応しています
送信先 AWS IoT SiteWise

プロパティエイリアスの設定

さて、OPC UA サーバからデータを取得するにはエイリアスの設定をする必要がありました。

左ペインで「アセット」を選択し、作成したアセットの名前を選択します。そして「編集」をクリックします。

測定」の各項目に次の通りエイリアスを入力します。その後「保存」をクリックします。

項目名 エイリアス
CPU /SampleObject/CPU
Memory /SampleObject/Memory

動作確認

それでは OPC UA サーバを動かしてみましょう。EC2 上で下記のコマンドを実行してください。

python3 sample_server.py

データが取得できているかどうかは SiteWise のコンソール上で確認できます。以下の画像のように、当該アセットの「測定値」タブの「最新の値」欄に値が入っていれば取得できています。

おわりに

お疲れ様でした。記事中のハンズオンはできるだけ簡潔に、なおかつどうやったら SiteWise でデータを取り込めるかが分かるように心がけましたが、その分あまり踏み込まなかった部分も多くあります。(このあたりは機会があれば記事にしたいと思っています。)様々な概念・機能が絡んでくるので、何をやっているかを理解するのはなかなか大変なのですが、データの取り込み自体は(問題なく設定できてさえいれば)意外にも簡単です。SiteWise の記事(特にハンズオン)はあまり見かけないので、本記事が一助にでもなれば幸いです。

参考リンク

Appendix: node-opcua を使用した OPC UA サーバでの手順

node-opcua を使用する場合は、お手数ですが以下の説明を参考に本文の該当箇所を読み替えてください。 なお、検証時の Node.js バージョンは 16.18.1 です。

1.

本文中のスクリプトの作成を以下の内容に読み替えてください。

以下のコマンドで必要なライブラリをインストールします。

npm install node-opcua

以下の内容のコードを sample_server.mjs という名前で作成します。

import {
    DataType,
    OPCUAServer,
    Variant,
} from 'node-opcua';
import * as os from 'os';

let cpuUsage = {
    previousAvgIdel: 0.0,
    previousAvgTick: 0.0,
    get: function () {
        const currentCPUAvg = _getCPUAverage();
        const idelDiff = currentCPUAvg.avgIdle - this.previousAvgIdel;
        const tickDiff = currentCPUAvg.avgTick - this.previousAvgTick;
        const cpuUsage = 100 - (idelDiff / tickDiff) * 100;
        console.log(`CPU Usage: ${cpuUsage}%`);
        this.previousAvgIdel = currentCPUAvg.avgIdle;
        this.previousAvgTick = currentCPUAvg.avgTick;
        return cpuUsage;
    },
};
const { avgIdle, avgTick } = _getCPUAverage();
cpuUsage.previousAvgIdel = avgIdle;
cpuUsage.previousAvgTick = avgTick;

(async () => {
    try {
        // endpoint is opc.tcp://<hostname>:4334/UA/Server
        const server = new OPCUAServer({
            port: 4334,
            resourcePath: '/UA/Server',
        });

        await server.initialize();

        const addressSpace = server.engine.addressSpace;
        const namespace = addressSpace.getOwnNamespace();

        const device = namespace.addObject({
            organizedBy: addressSpace.rootFolder.objects,
            browseName: 'SampleObject',
        });

        namespace.addVariable({
            componentOf: device,
            browseName: 'CPU',
            dataType: DataType.Double,
            minimumSamplingInterval: 1000,
            value: {
                get: () =>
                    new Variant({
                        dataType: DataType.Double,
                        value: cpuUsage.get(),
                    }),
            },
        });
        namespace.addVariable({
            componentOf: device,
            browseName: 'Memory',
            dataType: DataType.Double,
            minimumSamplingInterval: 1000,
            value: {
                get: () =>
                    new Variant({
                        dataType: DataType.Double,
                        value: getMemoryUsageMB(),
                    }),
            },
        });

        await server.start();
        console.log('OPC Server is now starting ...');
        console.log(
            'endpoint: ',
            server.endpoints[0].endpointDescriptions()[0].endpointUrl
        );

        process.on('SIGINT', async () => {
            await server.shutdown();
            console.log('Stopping OPC UA Server ...');
        });
    } catch (err) {
        console.log(err);
        process.exit(1);
    }
})();

function _getCPUAverage() {
    const cpus = os.cpus();
    let totalIdel = 0;
    let totalTick = 0;
    for (const cpu of cpus) {
        for (const type in cpu.times) {
            totalTick += cpu.times[type];
        }
        totalIdel += cpu.times.idle;
    }
    return {
        avgIdle: totalIdel / cpus.length,
        avgTick: totalTick / cpus.length,
    };
}

function getMemoryUsageMB() {
    const memoryUsage = (os.totalmem() - os.freemem()) / 1024 ** 2;
    console.log(`Memory Used: ${memoryUsage}MB`);
    return memoryUsage;
}

2.

本文中のデータソースの設定で設定しているローカルエンドポイントの値を opc.tcp://localhost:4334/UA/Server に変更してください。
あるいは、ローカルエンドポイントの設定値はそのままにして、上記の sample_server.mjs の OPC UA サーバを定義している箇所を以下のように書き換えてください。

const server = new OPCUAServer({
    port: 4840,
    resourcePath: '/freeopcua/server',
});

3.

本文中の動作確認のコマンド python3 sample_server.pynode sample_server.mjs に変更してください。

*1:https://docs.aws.amazon.com/iot-sitewise/latest/userguide/gateways-ggv2.html または https://docs.aws.amazon.com/iot-sitewise/latest/userguide/gateways-ggv1.html

*2:Greengrass V1 では Modbus TCPEthernet/IP にも対応しています。

*3:-y オプションをつけると java-11-amazon-corretto-headless がインストールされます。