シリアルデータをJSON形式に変換して、MQTTで、M5Stackからラズパイへ送る方法

ATOM Matrix/Lite

今回は、マイコンボードから取得したデータをシリアル通信で受信し、これをJSON形式に変換後、MQTTで、ラズパイへ送る方法を紹介します。

スポンサーリンク

今回作成するアプリ

シリアルデータをJSON形式に変換後、MQTTで、サーバ(ラズパイ)へ送信。

想定する利用シーンとしては、複数のCANバスからのメッセージを受信して、これをクラウドへアップロードすることをイメージしています。

シリアルデータ形式

シリアルデータ形式は、以下を想定しています。

「COMMU_TYPE、ADDRESS、ID、DATA0、DATA1、DATA2、DATA3、DATA4、DATA5、DATA6、DATA7,」

・COMMU_TYPE: CANとか、I2Cとかセンサー値を取得するための通信バスの種類を想定。

・ADDRESS: センサー名称とか、通信バス名とかを想定。

・ID: メッセージIDを想定。CAN IDなど。

・DATA: センサ値や通信バスデータを想定。8バイトと仮定。

JSONデータ

JSONデータは、以下の構成を想定しました。

{
 ”TIME”:[UNIX-TIME, FreeRun-time],
“COMMU_TYPE” : type,
“ADDRESS” : address,
“ID” : id,
 ”DATA” : [DATA0, DATA1, DATA2, DATA3, DATA4, DATA5, DATA6, DATA7]
}

JSONデータでは、シリアルデータに加えて、TIME情報を付与します。TIME情報は、ATOM内で更新される時刻情報です。これにより、サーバに蓄積された後、データ解析に使用できます。

UNIX-TIMEは、現在時刻をUNIX時刻で表現したもの。

FreeRun-timeは、ATOM起動から現在までをミリ秒でカウントしたものです。メッセージ間隔をミリ秒で計測することを想定して用意しています。

構成

・ATOM Matrix/Lite × 2台

・Raspberry Pi

・ArduinoIDE

ATOMの準備

ATOMのサンプルコード

#include "M5Atom.h"

#include <WiFi.h>
#include <PubSubClient.h>

#include <stdio.h>
#include <string.h>
#include <ArduinoJson.h>

#include "time.h"

// Wi-FiのSSID
char *ssid = "************";
// Wi-Fiのパスワード
char *password = "**************";

// MQTTの接続先(ラズパイ)のIP
const char *endpoint = "***.***.***.***";

// MQTTのポート
const int port = 1883;

// デバイスID デバイスIDは機器ごとにユニークにします
char *deviceID = "ATOM";

// メッセージを知らせるトピック
char *pubTopic = "/pub/ATOM";

// メッセージを待つトピック
char *subTopic = "/sub/ATOM";

#define JST 3600* 9

WiFiClient httpsClient;
PubSubClient mqttClient(httpsClient);

// --------------------
//----------------------------------------------------------------
// 日時 <-> 1970/1/1 00:00:00からの秒数(UNIXの日付) 相互変換
//----------------------------------------------------------------
#define SECONDS_IN_DAY   86400   // 24 * 60 * 60
#define SECONDS_IN_HOUR  3600    // 60 * 60

typedef struct {
  int  year;
  byte month;
  byte day;
  byte hour;
  byte min;
  byte sec;
} date_time;

date_time tm;

//----------------------------------------------------------------
// 西暦0年1月1日からの日数を返す
//----------------------------------------------------------------
unsigned long calc_0_days(int year, byte month, byte day) {
  unsigned long days;
  int daysinmonth_ruiseki[12] = {0,31,59,90,120,151,181,212,243,273,304,334};
  
  year--; // 当年は含まず 
  days = (unsigned long)year * 365;
  days += year / 4;  // 閏年の日数を足しこむ
  days -= year/100;  // 閏年で無い日数を差し引く
  days += year/400;  // 差し引きすぎた日数を足しこむ
  days += (unsigned long)daysinmonth_ruiseki[month-1];
  if( is_leapyear( year ) &&  3 <= month ) {
    day++;
  }
  days += (unsigned long)day;
  
  return days;
}

//----------------------------------------------------------------
// 西暦1970年1月1日からの日数を返す
//----------------------------------------------------------------
unsigned long calc_unix_days(int year, byte month, byte day) {
  unsigned long days;

  return calc_0_days(year, month, day) - calc_0_days(1970, 1, 1);
}

//----------------------------------------------------------------
// 西暦1970年1月1日 00:00:00 からの秒数を返す
//----------------------------------------------------------------
unsigned long calc_unix_seconds(int year, byte month, byte day, byte hour, byte minutes, byte second) {
  unsigned long days;
  unsigned long seconds;

  days = calc_unix_days(year, month, day);
  seconds = days * SECONDS_IN_DAY;
  seconds += (unsigned long)hour * SECONDS_IN_HOUR;
  seconds += (unsigned long)minutes * 60;
  seconds += (unsigned long)second;
  seconds += -9 * SECONDS_IN_HOUR; // JPN(GMT+9) 日本時間

  return seconds;
}
//----------------------------------------------------------------
// 閏年か否かを返す
//----------------------------------------------------------------
boolean is_leapyear( int year ){
  if( (year % 400) == 0 || ((year % 4) == 0 && (year % 100) != 0)) {
    return true;
  }else{
    return false;
  }
}
// --------------------

void setup_wifi(){
  
  Serial.println("Connecting to ");
  Serial.print(ssid);

  // WiFi接続性改善のため、いったん切断
  WiFi.disconnect( true, true ); //WiFi OFF, eraseAP=true
  delay(500);
  
  // WiFi開始
  WiFi.begin(ssid, password);
 
  // Wi-Fi接続待ち
  while (WiFi.status() != WL_CONNECTED){
    delay(500);
  }

  // WiFi接続成功メッセージの表示
  Serial.println("WiFi Connected.");

  // M5StackのIPアドレスを表示
  Serial.println(WiFi.localIP());
}

void setup() {

  M5.begin();
  delay(10);
  Serial.begin(115200);

  // Wi-Fi処理の開始
  setup_wifi();
  
  configTime(JST, 0, "ntp.nict.jp", "ntp.jst.mfeed.ad.jp");
  
  // シリアル通信機能の設定
  // Serial2.begin(unsigned long baud, uint32_t config, int8_t rxPin, int8_t txPin, bool invert)
  Serial2.begin(115200, SERIAL_8N1, 26, 32);

  // MQTTクライアントの初期化
  mqttClient.setServer(endpoint, port);

  // MQTT接続開始
  connectMQTT();
  
}

// MQTT接続処理
void connectMQTT(){

  // MQTT接続待ち
  while (!mqttClient.connected()){
    if (mqttClient.connect(deviceID)){
      // 所定のデバイスと接続した
      Serial.println("Connected.");
      int qos = 0;
      mqttClient.subscribe(subTopic, qos);
      Serial.println("Subscribed.");
    }else{
      // 所定のデバイスと接続失敗 ⇒ 再トライする
      Serial.print("Failed. Error state=");
      Serial.print(mqttClient.state());
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

// MQTT接続管理
void mqttLoop(){
  // MQTT接続状態を監視し、切断されていたら、再接続を試みる
  if (!mqttClient.connected()){
    connectMQTT();
  }
  // MQTTクライアント側の定期処理の実行
  mqttClient.loop();
}

// 文字列 = split(元の文字列,分割文字列,分割した後何番目の文字列か)
String split(String data, char separator, int index){
    int found = 0;
    int strIndex[] = {0, -1};
    int maxIndex = data.length();
    int endcheck = false;

    for(int i=0; i < maxIndex; i++){
        if( data.charAt(i) == separator ){
            found++;
            if ( found == index ){ strIndex[0] = i+1; }
            if ( found == index+1 ){ strIndex[1] = i; endcheck=true; }
            if(endcheck){ break; }
        }
    }
    return (strIndex[1]==-1) ? data.substring(strIndex[0]) : data.substring(strIndex[0], strIndex[1]);
}
//分割文字の数
int Keywordcounter(String Original,char Keyword){
    int Keywordcount = 0;
    for (int i = 0; i < Original.length(); i++){
        if (Original.charAt(i) == Keyword) { 
          Keywordcount++; 
        }
    }
    return Keywordcount;
}

void loop() {

  struct tm timeinfo;
  unsigned long unix_seconds;
  
  StaticJsonDocument<500> doc;
  char pubMessage[256];
  
  mqttLoop();   // MQTT接続監視

  // シリアル通信受信時にメッセージを送信する
  int recieved_byte_size = Serial2.available();
  if (recieved_byte_size){
    
    String recieveData = Serial2.readStringUntil('\n');

    Serial.println("シリアル通信データ:" + String(recieveData));

    //分割文字の数
    int count = Keywordcounter(recieveData, ',');
    String part[count+1];

    // シリアルデータの分割
    for (int i = 0; i < count+1; i++){
        part[i] = split(recieveData,',',i);
    }

    // 現在時刻を取得し、Unixタイムに変換
    if(getLocalTime(&timeinfo)){
      unix_seconds = calc_unix_seconds(
      timeinfo.tm_year+1900,
      timeinfo.tm_mon,
      timeinfo.tm_mday,
      timeinfo.tm_hour,
      timeinfo.tm_min,
      timeinfo.tm_sec);
    }else{
      unix_seconds = calc_unix_seconds(2020,10,14,18,32,20);
    }

    // JSONメッセージの作成
    JsonArray timeValues = doc.createNestedArray("TIME");
    timeValues.add(unix_seconds);
    timeValues.add(millis());
    
    JsonArray typeValues = doc.createNestedArray("COMMU_TYPE");
    typeValues.add(part[0]);

    JsonArray addressValues = doc.createNestedArray("ADDRESS");
    addressValues.add(part[1]);

    JsonArray idValues = doc.createNestedArray("ID");
    idValues.add(part[2]);

    JsonArray dataValues = doc.createNestedArray("DATA");
    dataValues.add(part[3]);
    dataValues.add(part[4]);
    dataValues.add(part[5]);
    dataValues.add(part[6]);
    dataValues.add(part[7]);
    dataValues.add(part[8]);
    dataValues.add(part[9]);
    dataValues.add(part[10]);
    
    serializeJson(doc, pubMessage);

    // メッセージの画面表示
    Serial.print("topic= ");
    Serial.println(pubTopic);
    Serial.print("Message= ");
    Serial.println(pubMessage);

    // メッセージのPublish
    mqttClient.publish(pubTopic, pubMessage);
    Serial.println("Published.");
    Serial.println("");
    
  }else{
  }

}

上のサンプルコードでは、簡単に言うと以下の処理を行っています。

・シリアルデータの受信: 受信したシリアルデータを、” , ”単位で分割して格納

・JSONデータ生成: 分割したデータを、JSONデータ形式に生成

・MQTTにて、メッセージをPublish

ラズパイの準備

ライブラリのインストール

python版MQTTライブラリであるpaho.mqttをインストールします。

$ sudo pip install paho-mqtt

ラズパイのサンプルコード

# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import json
import time
import concurrent.futures

MQTT_PORT = 1883
KEEP_ALIVE = 60
TOPIC = "/pub/ATOM"

#Brokerに接続できたとき
def on_connect(client, userdata, flag, rc):
  print("connected.")
  client.subscribe(TOPIC)

#Brokerと切断したとき
def on_disconnect(client, userdata, flag, rc):

  if  rc != 0:
    print("disconnected.")

#メッセージ受信
def on_message(client, userdata, msg):
  received = msg.payload.decode("utf-8","ignore")
  received = json.loads(received)
  print(received)

  for key_data in received:
    print(key_data)

  print(received["TIME"])
  print(received["ID"])
  print(received["DATA"][0])

  for val_data in received.values():
    print(val_data)

def task1():

  client = mqtt.Client()
  #コールバックを登録
  client.on_connect = on_connect
  client.on_disconnect = on_disconnect
  client.on_message = on_message
  client.connect("localhost", MQTT_PORT, KEEP_ALIVE)

  print("start mqtt")
  #待ち受け
  client.loop_forever()

def task2():
  while True:
#    print("task2")
    time.sleep(2)

if __name__ == "__main__":

  executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
  executor.submit(task1)
  executor.submit(task2)

ラズパイでは、以下を処理しています。

・MQTT接続時処理

・MQTTメッセージ受信時の処理

・MQTT切断時の処理

今後のことも考えて、マルチタスク化しています(task1、task2)。

ATOM(テストデータ入力)の準備

ATOM(テストデータ入力)のサンプルコード

#include "M5Atom.h"

void setup(){

  M5.begin(true, false, true);
  delay(10);
    
  // シリアル通信機能の設定
  // Serial2.begin(unsigned long baud, uint32_t config, int8_t rxPin, int8_t txPin, bool invert)
  Serial2.begin(115200, SERIAL_8N1, 26, 32);

}

void loop() {
  
  // シリアル通信を受信したときの処理
  if(Serial.available() > 0) {

    // 受信データを取得
    char ch = Serial.read();
    // 受信データをラズパイ側へそのまま送信
    Serial.print(ch);
    Serial2.print(ch);
        
  }else{
  }
   
  M5.update();      // update button state
}

テストデータ生成用のATOMは、PCからのテストデータを、ATOMのSerial2にゲートウエイしているだけです。

まとめ

今回は、シリアルデータをJSON形式に変換し、これをMQTTでラズパイに送信する方法を紹介しました。

参考

arduino日付関数

コメント

タイトルとURLをコピーしました