AWS Iot Coreについて

こんにちは久しぶりですね。 今回はAWS Iot Coreについて調べたので、記事にして発信していきたいと思います。 日本語のドキュメントがかなり少ないサービスでしたので、かなり苦労しました。

AWS Iot Core とは

AWSの公式サイトには以下のように書いてあります。

AWS IoT Core は、インターネットに接続されたデバイスから、クラウドアプリケーションやその他のデバイスに簡単かつ安全に通信するためのマネージド型クラウドサービスです。

MQTT

本編に入る前に今回の記事はMQTTが非常に大事な概念だと思いましたのでMQTTプロトコルについて説明しようと思います。

MQTTとはHTTPやSSHと同じメッセージプロトコルです。IANAによる定義ですと、TCP接続は1883番ポートを、TLS接続は8883番ポートを利用することになっているそうです。

ただ、AWSのIotに関しては本来HTTPSで利用するはずの443番をMQTT over TLSとして利用することもあるそうです。 理由としてはMQTT(8883)はあまり一般的ではないためクライアントの環境でプロトコルに使用されるポートが意図的にブロックされる可能性があるためのようです。 さらに詳しく知りたい方は以下にAWSの公式ブログのリンクを用意しましたので、一度読んでみてください!

AWS公式ブログ:ポート443でTLS認証を使ったMQTT

PubSub

MQTT は一対多の通信、つまり1つのクライアントのメッセージを、たくさんの人に配る仕組みを持っています。これは PubSub と呼ばれる仕組みです。

MQTT ブローカー

PubSub でメッセージを配信するサーバのことを MQTT ブローカーと呼びます。 クライアントからメッセージを受け取って、クライアントにメッセージを配るサーバだそうです。

Publisher と Subscriber

PubSub とは Publisher (パブリッシャー) と Subscriber (サブスクライバー) を繋いだ言葉です。

  • Publisher(送信者)からTopicにメッセージを送る
  • Subscriber(受信者)はTopicからメッセージを受けとる

ポイントとしてはPublisherから直接Subscriberに送信しないということですね。

それですと一対多の通信の通信ができないですね。

この際Subscriberがいくつあっても通信は成功するようです。

Topic

MQTT にはトピックという仕組みがあります。

これは メッセージの送り先を指定することができる と同時に メッセージを指定して受け取ることができる 機能です。

トピック例:

sdk/test/Python
aa/bb/cc

受信側はこれらのトピックを指定して、どの情報を受け取りたいかを MQTT ブローカに送ります

QoS

QoS とは Quality of Service の略です。

簡単に行ってしまえば、相手に届けるときの確実性を定義出来ます。

QoS には 3 段階あります。

  • QoS レベル 0 は 最大 1 回 届きます
  • QoS レベル 1 は 1 回以上 届きます
  • QoS レベル 2 は 正確に 1 回 届きます

メッセージを送る時に QoS を定義します。

あとは MQTT ブローカーが良きに計らってくれます。

基本的には QoS 0 だそうです、確実に届けない課金情報などに 1 以上を使う感じでいくそうです。

ちなみに QoS が上がれば上がるほど MQTT ブローカーの性能に影響するそうです。

Retain

MQTT には独特な機能としてリテインという機能があります。これは トピック事に最新のメッセージを MQTT ブローカーが保持しておいてくれる という機能です。

The MQTT specification provides a provision for the publisher to request that the broker retain the last message sent to a topic and send it to all future topic subscribers. AWS IoT doesn’t support retained messages. If a request is made to retain messages, the connection is disconnected.

Will

will は遺言と呼ばれる機能です。何かしら想定外の切断が合った場合、事前に設定しておいたトピックに対して、遺言を送ります。

つまり、クライアントに 想定外の切断 が起きた事を知ることができます。

AWS Iot Coreではこの機能はないそうです、変わりにShadowがあるそうですね。

AWS IoTの特徴

スケールするMQTT/RESTブローカー

30秒に1回くらいしかデータが飛んでこない時も、秒間何万件というデータが飛んで来る時も安定して動く

AWS IoTの各メッセージブローカーはAWSマネージドとして必要に応じて独自にスケールアウトする

SQLライクにデータを管理するRuleエンジン

IoTデバイスからメッセージブローカーに送られてきたデータは[Ruleエンジン]というサブスクライバによって処理された上で他のAWSリソースに運ばれます。 Topicからデータを抜き取ってAWSリソースに送ります。

Ruleエンジンの例:

SELECT * FROM 'sdk/test/Python'

AWSリソースは(S3, DynamoDB, Kinesis, SNS, Lambda,SQS,CloudWatch)となっています。 他にもHTTPやMQTTの送信機能もありますね。 詳しくは以下の公式の開発者ガイドに乗っています。

AWSIot開発者ガイドルールアクション

デバイスを仮想的に管理するデバイスSHADOW

デバイスSHADOWとはAWSの中に仮想的に持っているデバイスオブジェクトのようなものです。

デバイスが現在どういう状態なのか、実際のデバイスはオフラインになったりして管理できないこともよくあります。

デバイスが最後に通信した時の状態をデバイスSHADOWが保持していて、デバイスが再びオンラインになった時点で同期をとるようです。

補足

デバイスシャドウに関してはまだ触ってないです。今度やるかもしれないし、やらないかもしれない。

ハンズオン

画面は20200915のものです、変わる可能性は大いにあります^q^ マネージメントコンソールからAWS IoT Coreを開きます。

オンボード>開始方法>開始方法をクリックします。

画面の通り、デバイスの登録と接続キットのダウンロードと接続テストをします。

ここでは開始方法をクリックします。

ツールはLinuxとPythonを選択しました。

モノの名前を登録します。ここでは「hands-on-iot-python」としました。 以降証明書等でこの名前が使用されます。

モノと、証明書とポリシーが作成されます、接続キットをダウンロードします。

手順に従って、接続テストをします。

以上の流れで,AWS Iot coreでモノを作成し、証明書とポリシーと関連付けすることに成功しました。 さらに掘り下げたい方はクイックスタートではなく、それぞれを個別に作成してみたらいいかもしれません。

最後のシェルスクリプトはやっていることはルート証明書をダウンロードして、aws-device-sdkをダウンロードして aws-iot-device-sdk-python/samples/basicPubSub/basicPubSub.pyのサンプルコードを動かしています。

curl https://www.amazontrust.com/repository/AmazonRootCA1.pem > root-CA.crt

PythonでMQTTでメッセージを発行

PubSubのPublisherを作成します。 ツールを解凍したディレクトリと同じ場所に作成します。

python3 -m pip install awsiotsdk

aws-iot-device-sdk-python-v2 github

[publishのサンプルコード]

from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time as t
import json
from datetime import datetime as dt


# 諸々設定
ENDPOINT = "エンドポイント.iot.ap-northeast-1.amazonaws.com"
CLIENT_ID = "basicPubSub"
PATH_TO_CERT = "./hands-on-iot-python.cert.pem"
PATH_TO_KEY = "./hands-on-iot-python.private.key"
PATH_TO_ROOT = "./hands-on-iot-python.cert.pem"
TOPIC = "sdk/test/Python"

# MQTT設定
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=ENDPOINT,
            cert_filepath=PATH_TO_CERT,
            pri_key_filepath=PATH_TO_KEY,
            client_bootstrap=client_bootstrap,
            ca_filepath=PATH_TO_ROOT,
            client_id=CLIENT_ID,
            clean_session=False,
            keep_alive_secs=6
            )


connect_future = mqtt_connection.connect()
connect_future.result()
print('Begin Publish')

# 作成時間記録用
tdatetime = dt.now()
tstr = tdatetime.strftime('%Y-%m-%d %H:%M:%S')


data = {
    "key":"hand-on-python",
    "data": tstr
}
message = {"message" : data}

# publishする場所はここ
mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)



print("Published: '" + json.dumps(message) + "' to the topic: " + TOPIC)

# 接続を切る
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()

sdk/test/Pythonに一件のテストメッセージを発行します。

エンドポイントは左のツールバーの設定から参照することができます。 ご自身の環境で、調整してください。

証明書のhands-on-iot-python.cert.pemもモノで違う名前を使用した場合は環境に合わせて、変更してください。

特に大事なコードの説明

mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
  • mqtt_connection.publishでpublishできます。
  • topicでsdk/test/Pythonを指定しています。
  • payloadで中身、jsonを指定します。
  • qosは冒頭の説明があったように確実性を指定します、今回は0

QoS

  • AT_MOST_ONCE = 0
  • AT_LEAST_ONCE = 1
  • EXACTLY_ONCE = 2

mqtt.pyのソースコード

PythonでMQTTでメッセージを受信

続いてtopicから受信するサンプルコードです。

[subscribeのサンプルコード]

from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time as t
import json
import base64


def on_message_received(topic, payload, **kwargs):
    parsed = json.loads(payload.decode())
    print(parsed['message'])

ENDPOINT = "エンドポイント.iot.ap-northeast-1.amazonaws.com"
CLIENT_ID = "basicPubSub"
PATH_TO_CERT = "./hands-on-iot-python.cert.pem"
PATH_TO_KEY = "./hands-on-iot-python.private.key"
PATH_TO_ROOT = "./hands-on-iot-python.cert.pem"
TOPIC = 'sdk/test/Python'


event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=ENDPOINT,
            cert_filepath=PATH_TO_CERT,
            pri_key_filepath=PATH_TO_KEY,
            client_bootstrap=client_bootstrap,
            ca_filepath=PATH_TO_ROOT,
            client_id=CLIENT_ID,
            clean_session=False,
            keep_alive_secs=6
            )
connect_future = mqtt_connection.connect()
connect_future.result()


subscribe_future, packet_id = mqtt_connection.subscribe(
    topic=TOPIC,
    qos=mqtt.QoS.AT_LEAST_ONCE,
    callback = on_message_received
)
print('受信を開始')

# ここのsleepが受信する期間
t.sleep(10)



subscribe_result = subscribe_future.result()

# mqttコネクト終了
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()

特に大事なメソッド

subscribe_future, packet_id = mqtt_connection.subscribe(
    topic=TOPIC,
    qos=mqtt.QoS.AT_LEAST_ONCE,
    callback = on_message_received
)

subscribeメソッドは非同期で実行されます。 コネクト終了するまで実行され続けます。 値をtopicから受信するとcallbackが発動します。 今回はon_message_receivedメソッドが実行されます。

def on_message_received(topic, payload, **kwargs):
    parsed = json.loads(payload.decode())
    print(parsed['message'])

中身はpayloadのjsonをパージして’message’を表示しています。 メッセージが受信されるたびに発火します。

mqtt.pyのソースコード

まとめ

AWS Iot Coreを使用して、mqttプロトコルでpublish,subscribeしました。

Nakano
Nakano
Back-end engineer

AWS,Rails,UE4,vue.js,hugo,その他なんでもやりたい

関連項目