陈拓 2021/04/19-2021/04/20

1. 概述

在《将MQTT收到的数据保存到MySQL数据库》


https://zhuanlan.zhihu.com/p/365940502


https://blog.csdn.net/chentuo2000/article/details/115862060


一文中我们用同步的方法将MQTT订阅消息中的数据存储到了MySQL数据库中。


本文的程序用异步方式订阅消息。异步订阅消息的说明见:


《用C语言实现mosquitto MQTT订阅消息(异步)》


https://zhuanlan.zhihu.com/p/365483724


https://blog.csdn.net/chentuo2000/article/details/115786111


2. 异步mysql_log.c程序代码


  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <signal.h>
  6. #include <mosquitto.h>
  7. #include <mysql.h>
  8. #define db_host “localhost”
  9. #define db_username “ct”
  10. #define db_password “ct”
  11. #define db_database “smarthome”
  12. #define db_port 3306
  13. #define db_query “INSERT INTO temperature (deviceid, celsius_temp) VALUES (?,?)”
  14. #define mqtt_host “localhost”
  15. #define mqtt_port 1883
  16. static int run = 1;
  17. static MYSQL_STMT _stmt = NULL;
  18. void handle_signal(int s)
  19. {
  20. printf(“\n Capture sign no:%d\n”, s);
  21. run = 0;
  22. }
  23. void connect_callback(struct mosquitto _mosq, void _obj, int reason_code)
  24. {
  25. }
  26. void message_callback(struct mosquitto _mosq, void _obj, const struct mosquitto_message _message)
  27. {
  28. printf(“%s %d %s\n”, message->topic, message->qos, (char _)message->payload);
  29. MYSQL_BIND bind[2];
  30. memset(bind, 0, sizeof(bind));
  31. bind[0].buffer_type = MYSQL_TYPE_STRING;
  32. bind[0].buffer = message->topic;
  33. bind[0].buffer_length = strlen(message->topic);
  34. // Note: payload is normally a binary blob and could contains
  35. // NULL byte. This sample does not handle it and assume payload is a
  36. // string.
  37. bind[1].buffer_type = MYSQL_TYPE_STRING;
  38. bind[1].buffer = message->payload;
  39. bind[1].buffer_length = message->payloadlen;
  40. mysql_stmt_bind_param(stmt, bind);
  41. mysql_stmt_execute(stmt);
  42. }
  43. int main(int argc, char _argv[])
  44. {
  45. MYSQL *connection;
  46. my_bool reconnect = true;
  47. char clientid[24];
  48. struct <span class=”hljs-title class_“>mosquitto _mosq;
  49. int rc = 0;
  50. signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号
  51. signal(SIGTERM, handle_signal); // 程序结束(terminate)信号
  52. mysql_library_init(0, NULL, NULL);
  53. mosquitto_lib_init();
  54. connection = mysql_init(NULL);
  55. if(connection){
  56. mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect);
  57. connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0);
  58. if(connection){
  59. stmt = mysql_stmt_init(connection);
  60. mysql_stmt_prepare(stmt, db_query, strlen(db_query));
  61. memset(clientid, 0, 24);
  62. snprintf(clientid, 23, “mysql_log_%d”, getpid());
  63. mosq = mosquitto_new(clientid, true, connection);
  64. if(mosq){
  65. mosquitto_connect_callback_set(mosq, connect_callback);
  66. mosquitto_message_callback_set(mosq, message_callback);
  67. mosquitto_username_pw_set(mosq, “ct”, “1qaz2wsx”);
  68. rc = mosquitto_connect_async(mosq, mqtt_host, mqtt_port, 60);
  69. if(rc != MOSQ_ERR_SUCCESS) {
  70. mosquitto_destroy(mosq);
  71. fprintf(stderr, “Error: %s\n”, mosquitto_strerror(rc));
  72. return 1;
  73. }
  74. mosquitto_subscribe(mosq, NULL, “#”, 0);
  75. rc = mosquitto_loop_start(mosq);
  76. if(rc != MOSQ_ERR_SUCCESS) {
  77. mosquitto_destroy(mosq);
  78. fprintf(stderr, “Error: %s\n”, mosquitto_strerror(rc));
  79. return 1;
  80. }
  81. printf(“Start!\n”);
  82. while(run) {
  83. sleep(20);
  84. }
  85. }
  86. mysql_stmt_close(stmt);
  87. mysql_close(connection);
  88. }else{
  89. fprintf(stderr, “Error: Unable to connect to database.\n”);
  90. printf(“%s\n”, mysql_error(connection));
  91. rc = 1;
  92. }
  93. }else{
  94. fprintf(stderr, “Error: Unable to start mysql.\n”);
  95. rc = 1;
  96. }
  97. mysql_library_end();
  98. mosquitto_destroy(mosq);
  99. mosquitto_lib_cleanup();
  100. printf(“End!\n”);
  101. return rc;
  102. }



说明:


(1) 通配符#的使用


mosquitto_subscribe(mosq, NULL, “#”, 0);


在订阅中使用通配符可以收到多个主题的消息,通配符的详细用法可以看MQTT文档。


(2) CTRL+c正常结束程序


signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号


通过捕捉终端操作CTRL+c使程序能够正常退出以释放资源,而不是用CTRL+z强行终止程序的运行。


(3) 使用异步连接mosquitto_connect_async


(4) 使用异步循环mosquitto_loop_start


  1. while(run) {
  2. sleep(20);
  3. }

  • 编辑mysql_log.c

nano mysql_log.c



3. 编译、测试


编译


make



  • 本地测试

./mosquitto_mysql_log



  • 发布消息

再打开一个终端窗口,发送:


mosquitto_pub -p 1883 -u ct -P 1qaz2wsx -t temperature001 -m “25.1”



  • 查看订阅窗口


  • 远程测试

详细说明见《树莓派MQTT服务远程测试MQTT.fx》


https://zhuanlan.zhihu.com/p/363373024


https://blog.csdn.net/chentuo2000/article/details/115539377



点击Publish


  • 订阅测试窗口收到消息


  • 退出程序

CTRL+c正常退出:



  • 查看数据库

select _ from temperature where deviceid like ‘temperature%’;