陈拓 2021/04/19-2021/04/20
1. 概述
在《将MQTT收到的数据保存到MySQL数据库》
https://zhuanlan.zhihu.com/p/365940502
https://blog.csdn.net/chentuo2000/article/details/115862060
一文中我们用同步的方法将MQTT订阅消息中的数据存储到了MySQL数据库中。
本文的程序用异步方式订阅消息。异步订阅消息的说明见:
《用C语言实现mosquitto MQTT订阅消息(异步)》
https://zhuanlan.zhihu.com/p/365483724
https://blog.csdn.net/chentuo2000/article/details/115786111
2. 异步mysql_log.c程序代码
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
static int run = 1;
-
static MYSQL_STMT _stmt = NULL;
-
-
void handle_signal(int s)
-
{
-
printf(“\n Capture sign no:%d\n”, s);
-
run = 0;
-
}
-
-
void connect_callback(struct mosquitto _mosq, void _obj, int reason_code)
-
{
-
}
-
-
void message_callback(struct mosquitto _mosq, void _obj, const struct mosquitto_message _message)
-
{
-
printf(“%s %d %s\n”, message->topic, message->qos, (char _)message->payload);
-
-
MYSQL_BIND bind[2];
-
-
memset(bind, 0, sizeof(bind));
-
-
bind[0].buffer_type = MYSQL_TYPE_STRING;
-
bind[0].buffer = message->topic;
-
bind[0].buffer_length = strlen(message->topic);
-
// Note: payload is normally a binary blob and could contains
-
// NULL byte. This sample does not handle it and assume payload is a
-
// string.
-
bind[1].buffer_type = MYSQL_TYPE_STRING;
-
bind[1].buffer = message->payload;
-
bind[1].buffer_length = message->payloadlen;
-
-
mysql_stmt_bind_param(stmt, bind);
-
mysql_stmt_execute(stmt);
-
}
-
-
int main(int argc, char _argv[])
-
{
-
MYSQL *connection;
-
my_bool reconnect = true;
-
char clientid[24];
-
struct <span class=”hljs-title class_“>mosquitto _mosq;
-
int rc = 0;
-
-
signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号
-
signal(SIGTERM, handle_signal); // 程序结束(terminate)信号
-
-
mysql_library_init(0, NULL, NULL);
-
mosquitto_lib_init();
-
-
connection = mysql_init(NULL);
-
-
if(connection){
-
mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect);
-
-
connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0);
-
-
if(connection){
-
stmt = mysql_stmt_init(connection);
-
-
mysql_stmt_prepare(stmt, db_query, strlen(db_query));
-
-
memset(clientid, 0, 24);
-
snprintf(clientid, 23, “mysql_log_%d”, getpid());
-
mosq = mosquitto_new(clientid, true, connection);
-
if(mosq){
-
mosquitto_connect_callback_set(mosq, connect_callback);
-
mosquitto_message_callback_set(mosq, message_callback);
-
-
mosquitto_username_pw_set(mosq, “ct”, “1qaz2wsx”);
-
rc = mosquitto_connect_async(mosq, mqtt_host, mqtt_port, 60);
-
if(rc != MOSQ_ERR_SUCCESS) {
-
mosquitto_destroy(mosq);
-
fprintf(stderr, “Error: %s\n”, mosquitto_strerror(rc));
-
return 1;
-
}
-
-
mosquitto_subscribe(mosq, NULL, “#”, 0);
-
-
rc = mosquitto_loop_start(mosq);
-
if(rc != MOSQ_ERR_SUCCESS) {
-
mosquitto_destroy(mosq);
-
fprintf(stderr, “Error: %s\n”, mosquitto_strerror(rc));
-
return 1;
-
}
-
-
printf(“Start!\n”);
-
while(run) {
-
sleep(20);
-
}
-
}
-
mysql_stmt_close(stmt);
-
mysql_close(connection);
-
}else{
-
fprintf(stderr, “Error: Unable to connect to database.\n”);
-
printf(“%s\n”, mysql_error(connection));
-
rc = 1;
-
}
-
}else{
-
fprintf(stderr, “Error: Unable to start mysql.\n”);
-
rc = 1;
-
}
-
-
mysql_library_end();
-
mosquitto_destroy(mosq);
-
mosquitto_lib_cleanup();
-
printf(“End!\n”);
-
-
return rc;
-
}
说明:
(1) 通配符#的使用
mosquitto_subscribe(mosq, NULL, “#”, 0);
在订阅中使用通配符可以收到多个主题的消息,通配符的详细用法可以看MQTT文档。
(2) CTRL+c正常结束程序
signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号
通过捕捉终端操作CTRL+c使程序能够正常退出以释放资源,而不是用CTRL+z强行终止程序的运行。
(3) 使用异步连接mosquitto_connect_async
(4) 使用异步循环mosquitto_loop_start
-
while(run) {
-
sleep(20);
-
}
- 编辑mysql_log.c
nano mysql_log.c
3. 编译、测试
编译
make
- 本地测试
./mosquitto_mysql_log
- 发布消息
再打开一个终端窗口,发送:
mosquitto_pub -p 1883 -u ct -P 1qaz2wsx -t temperature001 -m “25.1”
- 查看订阅窗口
- 远程测试
详细说明见《树莓派MQTT服务远程测试MQTT.fx》
https://zhuanlan.zhihu.com/p/363373024
https://blog.csdn.net/chentuo2000/article/details/115539377
点击Publish。
- 订阅测试窗口收到消息
- 退出程序
CTRL+c正常退出:
- 查看数据库
select _ from temperature where deviceid like ‘temperature%’;
评论(0)
您还未登录,请登录后发表或查看评论