今回は、マイコンボードから取得したデータをシリアル通信で受信し、これをJSON形式に変換後、MQTTで、ラズパイへ送る方法を紹介します。
今回作成するアプリ
シリアルデータをJSON形式に変換後、MQTTで、サーバ(ラズパイ)へ送信。
想定する利用シーンとしては、複数のCANバスからのメッセージを受信して、これをクラウドへアップロードすることをイメージしています。
シリアルデータ形式
シリアルデータ形式は、以下を想定しています。
「COMMU_TYPE、ADDRESS、ID、DATA0、DATA1、DATA2、DATA3、DATA4、DATA5、DATA6、DATA7,」
・COMMU_TYPE: CANとか、I2Cとかセンサー値を取得するための通信バスの種類を想定。
・ADDRESS: センサー名称とか、通信バス名とかを想定。
・ID: メッセージIDを想定。CAN IDなど。
・DATA: センサ値や通信バスデータを想定。8バイトと仮定。
JSONデータ
JSONデータは、以下の構成を想定しました。
{
”TIME”:[UNIX-TIME, FreeRun-time],
“COMMU_TYPE” : type,
“ADDRESS” : address,
“ID” : id,
”DATA” : [DATA0, DATA1, DATA2, DATA3, DATA4, DATA5, DATA6, DATA7]
}
JSONデータでは、シリアルデータに加えて、TIME情報を付与します。TIME情報は、ATOM内で更新される時刻情報です。これにより、サーバに蓄積された後、データ解析に使用できます。
UNIX-TIMEは、現在時刻をUNIX時刻で表現したもの。
FreeRun-timeは、ATOM起動から現在までをミリ秒でカウントしたものです。メッセージ間隔をミリ秒で計測することを想定して用意しています。
構成
・ATOM Matrix/Lite × 2台
・Raspberry Pi
・ArduinoIDE
ATOMの準備
ATOMのサンプルコード
#include "M5Atom.h"
#include <WiFi.h>
#include <PubSubClient.h>
#include <stdio.h>
#include <string.h>
#include <ArduinoJson.h>
#include "time.h"
// Wi-FiのSSID
char *ssid = "************";
// Wi-Fiのパスワード
char *password = "**************";
// MQTTの接続先(ラズパイ)のIP
const char *endpoint = "***.***.***.***";
// MQTTのポート
const int port = 1883;
// デバイスID デバイスIDは機器ごとにユニークにします
char *deviceID = "ATOM";
// メッセージを知らせるトピック
char *pubTopic = "/pub/ATOM";
// メッセージを待つトピック
char *subTopic = "/sub/ATOM";
#define JST 3600* 9
WiFiClient httpsClient;
PubSubClient mqttClient(httpsClient);
// --------------------
//----------------------------------------------------------------
// 日時 <-> 1970/1/1 00:00:00からの秒数(UNIXの日付) 相互変換
//----------------------------------------------------------------
#define SECONDS_IN_DAY 86400 // 24 * 60 * 60
#define SECONDS_IN_HOUR 3600 // 60 * 60
typedef struct {
int year;
byte month;
byte day;
byte hour;
byte min;
byte sec;
} date_time;
date_time tm;
//----------------------------------------------------------------
// 西暦0年1月1日からの日数を返す
//----------------------------------------------------------------
unsigned long calc_0_days(int year, byte month, byte day) {
unsigned long days;
int daysinmonth_ruiseki[12] = {0,31,59,90,120,151,181,212,243,273,304,334};
year--; // 当年は含まず
days = (unsigned long)year * 365;
days += year / 4; // 閏年の日数を足しこむ
days -= year/100; // 閏年で無い日数を差し引く
days += year/400; // 差し引きすぎた日数を足しこむ
days += (unsigned long)daysinmonth_ruiseki[month-1];
if( is_leapyear( year ) && 3 <= month ) {
day++;
}
days += (unsigned long)day;
return days;
}
//----------------------------------------------------------------
// 西暦1970年1月1日からの日数を返す
//----------------------------------------------------------------
unsigned long calc_unix_days(int year, byte month, byte day) {
unsigned long days;
return calc_0_days(year, month, day) - calc_0_days(1970, 1, 1);
}
//----------------------------------------------------------------
// 西暦1970年1月1日 00:00:00 からの秒数を返す
//----------------------------------------------------------------
unsigned long calc_unix_seconds(int year, byte month, byte day, byte hour, byte minutes, byte second) {
unsigned long days;
unsigned long seconds;
days = calc_unix_days(year, month, day);
seconds = days * SECONDS_IN_DAY;
seconds += (unsigned long)hour * SECONDS_IN_HOUR;
seconds += (unsigned long)minutes * 60;
seconds += (unsigned long)second;
seconds += -9 * SECONDS_IN_HOUR; // JPN(GMT+9) 日本時間
return seconds;
}
//----------------------------------------------------------------
// 閏年か否かを返す
//----------------------------------------------------------------
boolean is_leapyear( int year ){
if( (year % 400) == 0 || ((year % 4) == 0 && (year % 100) != 0)) {
return true;
}else{
return false;
}
}
// --------------------
void setup_wifi(){
Serial.println("Connecting to ");
Serial.print(ssid);
// WiFi接続性改善のため、いったん切断
WiFi.disconnect( true, true ); //WiFi OFF, eraseAP=true
delay(500);
// WiFi開始
WiFi.begin(ssid, password);
// Wi-Fi接続待ち
while (WiFi.status() != WL_CONNECTED){
delay(500);
}
// WiFi接続成功メッセージの表示
Serial.println("WiFi Connected.");
// M5StackのIPアドレスを表示
Serial.println(WiFi.localIP());
}
void setup() {
M5.begin();
delay(10);
Serial.begin(115200);
// Wi-Fi処理の開始
setup_wifi();
configTime(JST, 0, "ntp.nict.jp", "ntp.jst.mfeed.ad.jp");
// シリアル通信機能の設定
// Serial2.begin(unsigned long baud, uint32_t config, int8_t rxPin, int8_t txPin, bool invert)
Serial2.begin(115200, SERIAL_8N1, 26, 32);
// MQTTクライアントの初期化
mqttClient.setServer(endpoint, port);
// MQTT接続開始
connectMQTT();
}
// MQTT接続処理
void connectMQTT(){
// MQTT接続待ち
while (!mqttClient.connected()){
if (mqttClient.connect(deviceID)){
// 所定のデバイスと接続した
Serial.println("Connected.");
int qos = 0;
mqttClient.subscribe(subTopic, qos);
Serial.println("Subscribed.");
}else{
// 所定のデバイスと接続失敗 ⇒ 再トライする
Serial.print("Failed. Error state=");
Serial.print(mqttClient.state());
// Wait 5 seconds before retrying
delay(5000);
}
}
}
// MQTT接続管理
void mqttLoop(){
// MQTT接続状態を監視し、切断されていたら、再接続を試みる
if (!mqttClient.connected()){
connectMQTT();
}
// MQTTクライアント側の定期処理の実行
mqttClient.loop();
}
// 文字列 = split(元の文字列,分割文字列,分割した後何番目の文字列か)
String split(String data, char separator, int index){
int found = 0;
int strIndex[] = {0, -1};
int maxIndex = data.length();
int endcheck = false;
for(int i=0; i < maxIndex; i++){
if( data.charAt(i) == separator ){
found++;
if ( found == index ){ strIndex[0] = i+1; }
if ( found == index+1 ){ strIndex[1] = i; endcheck=true; }
if(endcheck){ break; }
}
}
return (strIndex[1]==-1) ? data.substring(strIndex[0]) : data.substring(strIndex[0], strIndex[1]);
}
//分割文字の数
int Keywordcounter(String Original,char Keyword){
int Keywordcount = 0;
for (int i = 0; i < Original.length(); i++){
if (Original.charAt(i) == Keyword) {
Keywordcount++;
}
}
return Keywordcount;
}
void loop() {
struct tm timeinfo;
unsigned long unix_seconds;
StaticJsonDocument<500> doc;
char pubMessage[256];
mqttLoop(); // MQTT接続監視
// シリアル通信受信時にメッセージを送信する
int recieved_byte_size = Serial2.available();
if (recieved_byte_size){
String recieveData = Serial2.readStringUntil('\n');
Serial.println("シリアル通信データ:" + String(recieveData));
//分割文字の数
int count = Keywordcounter(recieveData, ',');
String part[count+1];
// シリアルデータの分割
for (int i = 0; i < count+1; i++){
part[i] = split(recieveData,',',i);
}
// 現在時刻を取得し、Unixタイムに変換
if(getLocalTime(&timeinfo)){
unix_seconds = calc_unix_seconds(
timeinfo.tm_year+1900,
timeinfo.tm_mon,
timeinfo.tm_mday,
timeinfo.tm_hour,
timeinfo.tm_min,
timeinfo.tm_sec);
}else{
unix_seconds = calc_unix_seconds(2020,10,14,18,32,20);
}
// JSONメッセージの作成
JsonArray timeValues = doc.createNestedArray("TIME");
timeValues.add(unix_seconds);
timeValues.add(millis());
JsonArray typeValues = doc.createNestedArray("COMMU_TYPE");
typeValues.add(part[0]);
JsonArray addressValues = doc.createNestedArray("ADDRESS");
addressValues.add(part[1]);
JsonArray idValues = doc.createNestedArray("ID");
idValues.add(part[2]);
JsonArray dataValues = doc.createNestedArray("DATA");
dataValues.add(part[3]);
dataValues.add(part[4]);
dataValues.add(part[5]);
dataValues.add(part[6]);
dataValues.add(part[7]);
dataValues.add(part[8]);
dataValues.add(part[9]);
dataValues.add(part[10]);
serializeJson(doc, pubMessage);
// メッセージの画面表示
Serial.print("topic= ");
Serial.println(pubTopic);
Serial.print("Message= ");
Serial.println(pubMessage);
// メッセージのPublish
mqttClient.publish(pubTopic, pubMessage);
Serial.println("Published.");
Serial.println("");
}else{
}
}
上のサンプルコードでは、簡単に言うと以下の処理を行っています。
・シリアルデータの受信: 受信したシリアルデータを、” , ”単位で分割して格納
・JSONデータ生成: 分割したデータを、JSONデータ形式に生成
・MQTTにて、メッセージをPublish
ラズパイの準備
ライブラリのインストール
python版MQTTライブラリであるpaho.mqttをインストールします。
$ sudo pip install paho-mqtt
ラズパイのサンプルコード
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import json
import time
import concurrent.futures
MQTT_PORT = 1883
KEEP_ALIVE = 60
TOPIC = "/pub/ATOM"
#Brokerに接続できたとき
def on_connect(client, userdata, flag, rc):
print("connected.")
client.subscribe(TOPIC)
#Brokerと切断したとき
def on_disconnect(client, userdata, flag, rc):
if rc != 0:
print("disconnected.")
#メッセージ受信
def on_message(client, userdata, msg):
received = msg.payload.decode("utf-8","ignore")
received = json.loads(received)
print(received)
for key_data in received:
print(key_data)
print(received["TIME"])
print(received["ID"])
print(received["DATA"][0])
for val_data in received.values():
print(val_data)
def task1():
client = mqtt.Client()
#コールバックを登録
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect("localhost", MQTT_PORT, KEEP_ALIVE)
print("start mqtt")
#待ち受け
client.loop_forever()
def task2():
while True:
# print("task2")
time.sleep(2)
if __name__ == "__main__":
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
executor.submit(task1)
executor.submit(task2)
ラズパイでは、以下を処理しています。
・MQTT接続時処理
・MQTTメッセージ受信時の処理
・MQTT切断時の処理
今後のことも考えて、マルチタスク化しています(task1、task2)。
ATOM(テストデータ入力)の準備
ATOM(テストデータ入力)のサンプルコード
#include "M5Atom.h"
void setup(){
M5.begin(true, false, true);
delay(10);
// シリアル通信機能の設定
// Serial2.begin(unsigned long baud, uint32_t config, int8_t rxPin, int8_t txPin, bool invert)
Serial2.begin(115200, SERIAL_8N1, 26, 32);
}
void loop() {
// シリアル通信を受信したときの処理
if(Serial.available() > 0) {
// 受信データを取得
char ch = Serial.read();
// 受信データをラズパイ側へそのまま送信
Serial.print(ch);
Serial2.print(ch);
}else{
}
M5.update(); // update button state
}
テストデータ生成用のATOMは、PCからのテストデータを、ATOMのSerial2にゲートウエイしているだけです。
まとめ
今回は、シリアルデータをJSON形式に変換し、これをMQTTでラズパイに送信する方法を紹介しました。
コメント