45 #include "contiki-net.h" 46 #include "contiki-lib.h" 47 #include "lib/random.h" 55 #include "tcp-socket.h" 57 #include "lib/assert.h" 67 #define PRINTF(...) printf(__VA_ARGS__) 73 MQTT_FHDR_MSG_TYPE_CONNECT = 0x10,
74 MQTT_FHDR_MSG_TYPE_CONNACK = 0x20,
75 MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30,
76 MQTT_FHDR_MSG_TYPE_PUBACK = 0x40,
77 MQTT_FHDR_MSG_TYPE_PUBREC = 0x50,
78 MQTT_FHDR_MSG_TYPE_PUBREL = 0x60,
79 MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70,
80 MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80,
81 MQTT_FHDR_MSG_TYPE_SUBACK = 0x90,
82 MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0,
83 MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0,
84 MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0,
85 MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0,
86 MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0,
88 MQTT_FHDR_DUP_FLAG = 0x08,
90 MQTT_FHDR_QOS_LEVEL_0 = 0x00,
91 MQTT_FHDR_QOS_LEVEL_1 = 0x02,
92 MQTT_FHDR_QOS_LEVEL_2 = 0x04,
94 MQTT_FHDR_RETAIN_FLAG = 0x01,
98 MQTT_VHDR_USERNAME_FLAG = 0x80,
99 MQTT_VHDR_PASSWORD_FLAG = 0x40,
101 MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
102 MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
103 MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
104 MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
106 MQTT_VHDR_WILL_FLAG = 0x04,
107 MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
108 } mqtt_vhdr_conn_fields_t;
111 MQTT_VHDR_CONN_ACCEPTED,
112 MQTT_VHDR_CONN_REJECTED_PROTOCOL,
113 MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
114 MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
115 MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
116 MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
117 } mqtt_vhdr_connack_ret_code_t;
120 MQTT_VHDR_CONNACK_SESSION_PRESENT = 0x1
121 } mqtt_vhdr_connack_flags_t;
126 MQTT_SUBACK_RET_QOS_0 = 0x00,
127 MQTT_SUBACK_RET_QOS_1 = 0x01,
128 MQTT_SUBACK_RET_QOS_2 = 0x02,
129 MQTT_SUBACK_RET_FAIL = 0x08,
130 } mqtt_suback_ret_code_t;
147 #define MQTT_CONNECT_VHDR_SIZE 12 160 #define MQTT_CONNECT_VHDR_SIZE 10 163 #define MQTT_STRING_LEN_SIZE 2 164 #define MQTT_MID_SIZE 2 165 #define MQTT_QOS_SIZE 1 167 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10) 169 #define INCREMENT_MID(conn) (conn)->mid_counter += 2 170 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length)) 173 #define PT_MQTT_WRITE_BYTES(conn, data, len) \ 174 conn->out_write_pos = 0; \ 175 while(write_bytes(conn, data, len)) { \ 176 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ 179 #define PT_MQTT_WRITE_BYTE(conn, data) \ 180 while(write_byte(conn, data)) { \ 181 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ 190 #define PT_MQTT_WAIT_SEND() \ 192 if (PROCESS_ERR_OK == \ 193 process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \ 195 PROCESS_WAIT_EVENT(); \ 196 if(ev == mqtt_abort_now_event) { \ 197 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ 198 PT_INIT(&conn->out_proto_thread); \ 199 process_post(PROCESS_CURRENT(), ev, data); \ 200 } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ 201 process_post(PROCESS_CURRENT(), ev, data); \ 203 } while (ev != mqtt_continue_send_event); \ 207 static process_event_t mqtt_do_connect_tcp_event;
208 static process_event_t mqtt_do_connect_mqtt_event;
209 static process_event_t mqtt_do_disconnect_mqtt_event;
210 static process_event_t mqtt_do_subscribe_event;
211 static process_event_t mqtt_do_unsubscribe_event;
212 static process_event_t mqtt_do_publish_event;
213 static process_event_t mqtt_do_pingreq_event;
214 static process_event_t mqtt_continue_send_event;
215 static process_event_t mqtt_abort_now_event;
216 process_event_t mqtt_update_event;
223 static process_event_t mqtt_event_min;
224 static process_event_t mqtt_event_max;
228 tcp_input(
struct tcp_socket *s,
void *ptr,
const uint8_t *input_data_ptr,
231 static void tcp_event(
struct tcp_socket *s,
void *ptr,
232 tcp_socket_event_t event);
234 static void reset_packet(
struct mqtt_in_packet *packet);
236 LIST(mqtt_conn_list);
238 PROCESS(mqtt_process,
"MQTT process");
241 call_event(
struct mqtt_connection *conn,
245 conn->event_callback(conn, event, data);
246 process_post(conn->app_process, mqtt_update_event, NULL);
250 reset_defaults(
struct mqtt_connection *conn)
252 conn->mid_counter = 1;
253 PT_INIT(&conn->out_proto_thread);
254 conn->waiting_for_pingresp = 0;
256 reset_packet(&conn->in_packet);
257 conn->out_buffer_sent = 0;
261 abort_connection(
struct mqtt_connection *conn)
263 conn->out_buffer_ptr = conn->out_buffer;
264 conn->out_queue_full = 0;
267 memset(&conn->out_packet, 0,
sizeof(conn->out_packet));
269 tcp_socket_close(&conn->socket);
270 tcp_socket_unregister(&conn->socket);
272 memset(&conn->socket, 0,
sizeof(conn->socket));
274 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
278 connect_tcp(
struct mqtt_connection *conn)
280 conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
282 reset_defaults(conn);
283 tcp_socket_register(&(conn->socket),
286 MQTT_TCP_INPUT_BUFF_SIZE,
288 MQTT_TCP_OUTPUT_BUFF_SIZE,
291 tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
295 disconnect_tcp(
struct mqtt_connection *conn)
297 conn->state = MQTT_CONN_STATE_DISCONNECTING;
298 tcp_socket_close(&(conn->socket));
299 tcp_socket_unregister(&conn->socket);
301 memset(&conn->socket, 0,
sizeof(conn->socket));
305 send_out_buffer(
struct mqtt_connection *conn)
307 if(conn->out_buffer_ptr - conn->out_buffer == 0) {
308 conn->out_buffer_sent = 1;
311 conn->out_buffer_sent = 0;
313 DBG(
"MQTT - (send_out_buffer) Space used in buffer: %i\n",
314 conn->out_buffer_ptr - conn->out_buffer);
316 tcp_socket_send(&conn->socket, conn->out_buffer,
317 conn->out_buffer_ptr - conn->out_buffer);
321 string_to_mqtt_string(
struct mqtt_string *mqtt_string,
char *
string)
323 if(mqtt_string == NULL) {
326 mqtt_string->string = string;
329 mqtt_string->length = strlen(
string);
331 mqtt_string->length = 0;
336 write_byte(
struct mqtt_connection *conn, uint8_t data)
338 DBG(
"MQTT - (write_byte) buff_size: %i write: '%02X'\n",
339 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
342 if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
343 send_out_buffer(conn);
347 *conn->out_buffer_ptr = data;
348 conn->out_buffer_ptr++;
353 write_bytes(
struct mqtt_connection *conn, uint8_t *data, uint16_t len)
355 uint16_t write_bytes;
357 MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
358 len - conn->out_write_pos);
360 memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
361 conn->out_write_pos += write_bytes;
362 conn->out_buffer_ptr += write_bytes;
364 DBG(
"MQTT - (write_bytes) len: %u write_pos: %lu\n", len,
365 conn->out_write_pos);
367 if(len - conn->out_write_pos == 0) {
368 conn->out_write_pos = 0;
371 send_out_buffer(conn);
372 return len - conn->out_write_pos;
377 encode_remaining_length(uint8_t *remaining_length,
378 uint8_t *remaining_length_bytes,
383 DBG(
"MQTT - Encoding length %lu\n", length);
385 *remaining_length_bytes = 0;
387 digit = length % 128;
388 length = length / 128;
390 digit = digit | 0x80;
393 remaining_length[*remaining_length_bytes] = digit;
394 (*remaining_length_bytes)++;
395 DBG(
"MQTT - Encode len digit '%u' length '%lu'\n", digit, length);
396 }
while(length > 0 && *remaining_length_bytes < 5);
397 DBG(
"MQTT - remaining_length_bytes %u\n", *remaining_length_bytes);
401 keep_alive_callback(
void *ptr)
403 struct mqtt_connection *conn = ptr;
405 DBG(
"MQTT - (keep_alive_callback) Called!\n");
408 if(conn->waiting_for_pingresp) {
409 PRINTF(
"MQTT - Disconnect due to no PINGRESP from broker.\n");
410 disconnect_tcp(conn);
414 process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
418 reset_packet(
struct mqtt_in_packet *packet)
420 memset(packet, 0,
sizeof(
struct mqtt_in_packet));
421 packet->remaining_multiplier = 1;
425 PT_THREAD(connect_pt(
struct pt *pt,
struct mqtt_connection *conn))
429 DBG(
"MQTT - Sending CONNECT message...\n");
432 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
433 conn->out_packet.remaining_length = 0;
434 conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_SIZE;
435 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
436 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
437 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
438 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
439 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
440 encode_remaining_length(conn->out_packet.remaining_length_enc,
441 &conn->out_packet.remaining_length_enc_bytes,
442 conn->out_packet.remaining_length);
443 if(conn->out_packet.remaining_length_enc_bytes > 4) {
444 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
445 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
450 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
451 PT_MQTT_WRITE_BYTES(conn,
452 conn->out_packet.remaining_length_enc,
453 conn->out_packet.remaining_length_enc_bytes);
454 PT_MQTT_WRITE_BYTE(conn, 0);
455 PT_MQTT_WRITE_BYTE(conn, strlen(MQTT_PROTOCOL_NAME));
456 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, strlen(MQTT_PROTOCOL_NAME));
457 PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
458 PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
459 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
460 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
461 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length >> 8);
462 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
463 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
464 conn->client_id.length);
465 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
466 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length >> 8);
467 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
468 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
469 conn->will.topic.length);
470 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length >> 8);
471 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
472 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
473 conn->will.message.length);
474 DBG(
"MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
475 conn->will.topic.string,
476 conn->will.topic.length,
477 conn->will.message.string,
478 conn->will.message.length);
480 if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
481 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length >> 8);
482 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
483 PT_MQTT_WRITE_BYTES(conn,
484 (uint8_t *)conn->credentials.username.string,
485 conn->credentials.username.length);
487 if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
488 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length >> 8);
489 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
490 PT_MQTT_WRITE_BYTES(conn,
491 (uint8_t *)conn->credentials.password.string,
492 conn->credentials.password.length);
496 send_out_buffer(conn);
497 conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
499 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
502 reset_packet(&conn->in_packet);
503 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
506 DBG(
"Timeout waiting for CONNACK\n");
510 reset_packet(&conn->in_packet);
512 DBG(
"MQTT - Done sending CONNECT\n");
515 DBG(
"MQTT - CONNECT message sent: \n");
517 for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
518 DBG(
"%02X ", conn->out_buffer[i]);
527 PT_THREAD(disconnect_pt(
struct pt *pt,
struct mqtt_connection *conn))
531 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
532 PT_MQTT_WRITE_BYTE(conn, 0);
534 send_out_buffer(conn);
548 PT_THREAD(subscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
552 DBG(
"MQTT - Sending subscribe message! topic %s topic_length %i\n",
553 conn->out_packet.topic,
554 conn->out_packet.topic_length);
555 DBG(
"MQTT - Buffer space is %i \n",
556 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
559 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
560 conn->out_packet.remaining_length = MQTT_MID_SIZE +
561 MQTT_STRING_LEN_SIZE +
562 conn->out_packet.topic_length +
564 encode_remaining_length(conn->out_packet.remaining_length_enc,
565 &conn->out_packet.remaining_length_enc_bytes,
566 conn->out_packet.remaining_length);
567 if(conn->out_packet.remaining_length_enc_bytes > 4) {
568 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
569 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
574 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
575 PT_MQTT_WRITE_BYTES(conn,
576 conn->out_packet.remaining_length_enc,
577 conn->out_packet.remaining_length_enc_bytes);
579 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
580 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
582 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
583 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
584 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
585 conn->out_packet.topic_length);
586 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
589 send_out_buffer(conn);
590 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
593 reset_packet(&conn->in_packet);
594 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
598 DBG(
"Timeout waiting for SUBACK\n");
600 reset_packet(&conn->in_packet);
603 conn->out_queue_full = 0;
605 DBG(
"MQTT - Done in send_subscribe!\n");
611 PT_THREAD(unsubscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
615 DBG(
"MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
616 conn->out_packet.topic,
617 conn->out_packet.topic_length);
618 DBG(
"MQTT - Buffer space is %i \n",
619 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
622 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
623 MQTT_FHDR_QOS_LEVEL_1;
624 conn->out_packet.remaining_length = MQTT_MID_SIZE +
625 MQTT_STRING_LEN_SIZE +
626 conn->out_packet.topic_length;
627 encode_remaining_length(conn->out_packet.remaining_length_enc,
628 &conn->out_packet.remaining_length_enc_bytes,
629 conn->out_packet.remaining_length);
630 if(conn->out_packet.remaining_length_enc_bytes > 4) {
631 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
632 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
637 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
638 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
639 conn->out_packet.remaining_length_enc_bytes);
641 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
642 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
644 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
645 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
646 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
647 conn->out_packet.topic_length);
650 send_out_buffer(conn);
651 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
654 reset_packet(&conn->in_packet);
655 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
659 DBG(
"Timeout waiting for UNSUBACK\n");
662 reset_packet(&conn->in_packet);
665 conn->out_queue_full = 0;
667 DBG(
"MQTT - Done writing subscribe message to out buffer!\n");
673 PT_THREAD(publish_pt(
struct pt *pt,
struct mqtt_connection *conn))
677 DBG(
"MQTT - Sending publish message! topic %s topic_length %i\n",
678 conn->out_packet.topic,
679 conn->out_packet.topic_length);
680 DBG(
"MQTT - Buffer space is %i \n",
681 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
684 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
685 conn->out_packet.qos << 1;
686 if(conn->out_packet.retain == MQTT_RETAIN_ON) {
687 conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
689 conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
690 conn->out_packet.topic_length +
691 conn->out_packet.payload_size;
692 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
693 conn->out_packet.remaining_length += MQTT_MID_SIZE;
695 encode_remaining_length(conn->out_packet.remaining_length_enc,
696 &conn->out_packet.remaining_length_enc_bytes,
697 conn->out_packet.remaining_length);
698 if(conn->out_packet.remaining_length_enc_bytes > 4) {
699 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
700 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
705 if(conn->out_packet.qos == MQTT_QOS_LEVEL_0) {
706 conn->out_packet.fhdr &= ~MQTT_FHDR_DUP_FLAG;
710 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
711 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
712 conn->out_packet.remaining_length_enc_bytes);
714 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
715 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
716 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
717 conn->out_packet.topic_length);
718 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
719 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
720 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
724 PT_MQTT_WRITE_BYTES(conn,
725 conn->out_packet.payload,
726 conn->out_packet.payload_size);
728 send_out_buffer(conn);
729 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
737 if(conn->out_packet.qos == 0) {
738 process_post(conn->app_process, mqtt_update_event, NULL);
739 }
else if(conn->out_packet.qos == 1) {
741 reset_packet(&conn->in_packet);
742 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
745 DBG(
"Timeout waiting for PUBACK\n");
747 if(conn->in_packet.mid != conn->out_packet.mid) {
748 DBG(
"MQTT - Warning, got PUBACK with none matching MID. Currently there " 749 "is no support for several concurrent PUBLISH messages.\n");
751 }
else if(conn->out_packet.qos == 2) {
752 DBG(
"MQTT - QoS not implemented yet.\n");
756 reset_packet(&conn->in_packet);
759 conn->out_queue_full = 0;
761 DBG(
"MQTT - Publish Enqueued\n");
767 PT_THREAD(pingreq_pt(
struct pt *pt,
struct mqtt_connection *conn))
771 DBG(
"MQTT - Sending PINGREQ\n");
774 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
775 PT_MQTT_WRITE_BYTE(conn, 0);
777 send_out_buffer(conn);
780 conn->waiting_for_pingresp = 1;
783 reset_packet(&conn->in_packet);
784 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
788 reset_packet(&conn->in_packet);
790 conn->waiting_for_pingresp = 0;
796 handle_connack(
struct mqtt_connection *conn)
798 mqtt_connack_event_t connack_event;
800 DBG(
"MQTT - Got CONNACK\n");
802 if(conn->in_packet.payload[1] != 0) {
803 PRINTF(
"MQTT - Connection refused with Return Code %i\n",
804 conn->in_packet.payload[1]);
806 MQTT_EVENT_CONNECTION_REFUSED_ERROR,
807 &conn->in_packet.payload[1]);
808 abort_connection(conn);
812 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
814 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1 815 connack_event.session_present = conn->in_packet.payload[0] & MQTT_VHDR_CONNACK_SESSION_PRESENT;
819 keep_alive_callback, conn);
822 conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
823 call_event(conn, MQTT_EVENT_CONNECTED, &connack_event);
827 handle_pingresp(
struct mqtt_connection *conn)
829 DBG(
"MQTT - Got PINGRESP\n");
833 handle_suback(
struct mqtt_connection *conn)
835 mqtt_suback_event_t suback_event;
837 DBG(
"MQTT - Got SUBACK\n");
840 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
841 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
842 DBG(
"MQTT - Error, SUBACK with > 1 topic, not supported.\n");
845 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
847 suback_event.mid = (conn->in_packet.payload[0] << 8) |
848 (conn->in_packet.payload[1]);
849 conn->in_packet.mid = suback_event.mid;
852 suback_event.success = 0;
854 switch(conn->in_packet.payload[2]) {
855 case MQTT_SUBACK_RET_FAIL:
856 PRINTF(
"MQTT - Error, SUBSCRIBE failed with SUBACK return code '%x'", conn->in_packet.payload[2]);
859 case MQTT_SUBACK_RET_QOS_0:
860 case MQTT_SUBACK_RET_QOS_1:
861 case MQTT_SUBACK_RET_QOS_2:
862 suback_event.qos_level = conn->in_packet.payload[2] & 0x03;
863 suback_event.success = 1;
867 PRINTF(
"MQTT - Error, Unrecognised SUBACK return code '%x'", conn->in_packet.payload[2]);
871 suback_event.return_code = conn->in_packet.payload[2];
873 suback_event.qos_level = conn->in_packet.payload[2];
876 if(conn->in_packet.mid != conn->out_packet.mid) {
877 DBG(
"MQTT - Warning, got SUBACK with none matching MID. Currently there is" 878 "no support for several concurrent SUBSCRIBE messages.\n");
882 call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
886 handle_unsuback(
struct mqtt_connection *conn)
888 DBG(
"MQTT - Got UNSUBACK\n");
890 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
891 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
892 (conn->in_packet.payload[1]);
894 if(conn->in_packet.mid != conn->out_packet.mid) {
895 DBG(
"MQTT - Warning, got UNSUBACK with none matching MID. Currently there is" 896 "no support for several concurrent UNSUBSCRIBE messages.\n");
899 call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
903 handle_puback(
struct mqtt_connection *conn)
905 DBG(
"MQTT - Got PUBACK\n");
907 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
908 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
909 (conn->in_packet.payload[1]);
911 call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
914 static mqtt_pub_status_t
915 handle_publish(
struct mqtt_connection *conn)
917 DBG(
"MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
918 DBG(
"MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
920 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1 921 if(strlen(conn->in_publish_msg.topic) < conn->in_packet.topic_len) {
922 DBG(
"NULL detected in received PUBLISH topic\n");
924 return MQTT_PUBLISH_ERR;
928 DBG(
"MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos);
930 if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
931 PRINTF(
"MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
934 call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
936 if(conn->in_publish_msg.first_chunk == 1) {
937 conn->in_publish_msg.first_chunk = 0;
941 if(conn->in_publish_msg.payload_left == 0) {
946 DBG(
"MQTT - (handle_publish) resetting packet.\n");
947 reset_packet(&conn->in_packet);
950 return MQTT_PUBLISH_OK;
954 parse_publish_vhdr(
struct mqtt_connection *conn,
956 const uint8_t *input_data_ptr,
962 if(conn->in_packet.topic_len_received == 0) {
963 conn->in_packet.topic_pos = 0;
964 conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
965 conn->in_packet.byte_counter++;
966 if(*pos >= input_data_len) {
969 conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
970 conn->in_packet.byte_counter++;
971 conn->in_packet.topic_len_received = 1;
973 if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
974 DBG(
"MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
977 DBG(
"MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
982 if(conn->in_packet.topic_len_received == 1 &&
983 conn->in_packet.topic_received == 0) {
984 copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
985 input_data_len - *pos);
986 DBG(
"MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos,
988 memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
989 &input_data_ptr[*pos],
991 (*pos) += copy_bytes;
992 conn->in_packet.byte_counter += copy_bytes;
993 conn->in_packet.topic_pos += copy_bytes;
995 if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
996 DBG(
"MQTT - Got topic '%s'", conn->in_publish_msg.topic);
997 conn->in_packet.topic_received = 1;
998 conn->in_publish_msg.topic[conn->in_packet.topic_pos] =
'\0';
999 conn->in_publish_msg.payload_length =
1000 conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
1001 conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
1005 conn->in_publish_msg.first_chunk = 1;
1010 tcp_input(
struct tcp_socket *s,
1012 const uint8_t *input_data_ptr,
1015 struct mqtt_connection *conn = ptr;
1017 uint32_t copy_bytes = 0;
1019 mqtt_pub_status_t pub_status;
1021 if(input_data_len == 0) {
1025 if(conn->in_packet.packet_received) {
1026 reset_packet(&conn->in_packet);
1029 DBG(
"tcp_input with %i bytes of data:\n", input_data_len);
1032 if(!conn->in_packet.fhdr) {
1033 conn->in_packet.fhdr = input_data_ptr[pos++];
1034 conn->in_packet.byte_counter++;
1036 DBG(
"MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
1038 if(pos >= input_data_len) {
1044 if(!conn->in_packet.has_remaining_length) {
1046 if(pos >= input_data_len) {
1050 byte = input_data_ptr[pos++];
1051 conn->in_packet.byte_counter++;
1052 conn->in_packet.remaining_length_bytes++;
1053 DBG(
"MQTT - Read Remaining Length byte\n");
1055 if(conn->in_packet.byte_counter > 5) {
1056 call_event(conn, MQTT_EVENT_ERROR, NULL);
1057 DBG(
"Received more then 4 byte 'remaining lenght'.");
1061 conn->in_packet.remaining_length +=
1062 (byte & 127) * conn->in_packet.remaining_multiplier;
1063 conn->in_packet.remaining_multiplier *= 128;
1064 }
while((byte & 128) != 0);
1066 DBG(
"MQTT - Finished reading remaining length byte\n");
1067 conn->in_packet.has_remaining_length = 1;
1076 if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
1077 (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
1079 PRINTF(
"MQTT - Error, unsupported payload size for non-PUBLISH message\n");
1081 conn->in_packet.byte_counter += input_data_len;
1082 if(conn->in_packet.byte_counter >=
1083 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1084 conn->in_packet.packet_received = 1;
1095 while(conn->in_packet.byte_counter <
1096 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1098 if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1099 conn->in_packet.topic_received == 0) {
1100 parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1104 copy_bytes = MIN(input_data_len - pos,
1105 MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1106 DBG(
"- Copied %lu payload bytes\n", copy_bytes);
1107 memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1108 &input_data_ptr[pos],
1110 conn->in_packet.byte_counter += copy_bytes;
1111 conn->in_packet.payload_pos += copy_bytes;
1115 DBG(
"MQTT - Copied bytes: \n");
1116 for(i = 0; i < copy_bytes; i++) {
1117 DBG(
"%02X ", conn->in_packet.payload[i]);
1122 if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1123 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1124 conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1125 conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1127 pub_status = handle_publish(conn);
1129 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1130 conn->in_packet.payload_pos = 0;
1132 if(pub_status != MQTT_PUBLISH_OK) {
1137 if(pos >= input_data_len &&
1138 (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1146 DBG(
"MQTT - Finished reading packet!\n");
1148 DBG(
"MQTT - total data was %i bytes of data. \n",
1149 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1152 switch(conn->in_packet.fhdr & 0xF0) {
1153 case MQTT_FHDR_MSG_TYPE_CONNACK:
1154 handle_connack(conn);
1156 case MQTT_FHDR_MSG_TYPE_PUBLISH:
1158 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1159 conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1160 conn->in_publish_msg.payload_left = 0;
1161 (void) handle_publish(conn);
1163 case MQTT_FHDR_MSG_TYPE_PUBACK:
1164 handle_puback(conn);
1166 case MQTT_FHDR_MSG_TYPE_SUBACK:
1167 handle_suback(conn);
1169 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1170 handle_unsuback(conn);
1172 case MQTT_FHDR_MSG_TYPE_PINGRESP:
1173 handle_pingresp(conn);
1177 case MQTT_FHDR_MSG_TYPE_PUBREC:
1178 case MQTT_FHDR_MSG_TYPE_PUBREL:
1179 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1180 call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1181 PRINTF(
"MQTT - Got unhandled MQTT Message Type '%i'",
1182 (conn->in_packet.fhdr & 0xF0));
1187 PRINTF(
"MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1191 conn->in_packet.packet_received = 1;
1200 tcp_event(
struct tcp_socket *s,
void *ptr, tcp_socket_event_t event)
1202 struct mqtt_connection *conn = ptr;
1208 case TCP_SOCKET_CLOSED:
1209 case TCP_SOCKET_TIMEDOUT:
1210 case TCP_SOCKET_ABORTED: {
1212 DBG(
"MQTT - Disconnected by tcp event %d\n", event);
1213 process_post(&mqtt_process, mqtt_abort_now_event, conn);
1214 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1216 call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1217 abort_connection(conn);
1220 if(conn->auto_reconnect == 1) {
1225 case TCP_SOCKET_CONNECTED: {
1226 conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1227 conn->out_buffer_sent = 1;
1229 process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1232 case TCP_SOCKET_DATA_SENT: {
1233 DBG(
"MQTT - Got TCP_DATA_SENT\n");
1235 if(conn->socket.output_data_len == 0) {
1236 conn->out_buffer_sent = 1;
1237 conn->out_buffer_ptr = conn->out_buffer;
1245 DBG(
"MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1253 static struct mqtt_connection *conn;
1260 if(ev == mqtt_abort_now_event) {
1261 DBG(
"MQTT - Abort\n");
1263 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1265 abort_connection(conn);
1267 if(ev == mqtt_do_connect_tcp_event) {
1269 DBG(
"MQTT - Got mqtt_do_connect_tcp_event!\n");
1272 if(ev == mqtt_do_connect_mqtt_event) {
1274 conn->socket.output_data_max_seg = conn->max_segment_size;
1275 DBG(
"MQTT - Got mqtt_do_connect_mqtt_event!\n");
1277 if(conn->out_buffer_sent == 1) {
1278 PT_INIT(&conn->out_proto_thread);
1279 while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1280 conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1281 PT_MQTT_WAIT_SEND();
1285 if(ev == mqtt_do_disconnect_mqtt_event) {
1287 DBG(
"MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1290 if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1291 if(conn->out_buffer_sent == 1) {
1292 PT_INIT(&conn->out_proto_thread);
1293 while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1294 disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1295 PT_MQTT_WAIT_SEND();
1297 abort_connection(conn);
1298 call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1300 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1304 if(ev == mqtt_do_pingreq_event) {
1306 DBG(
"MQTT - Got mqtt_do_pingreq_event!\n");
1308 if(conn->out_buffer_sent == 1 &&
1309 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1310 PT_INIT(&conn->out_proto_thread);
1311 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1312 pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1313 PT_MQTT_WAIT_SEND();
1317 if(ev == mqtt_do_subscribe_event) {
1319 DBG(
"MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1321 if(conn->out_buffer_sent == 1 &&
1322 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1323 PT_INIT(&conn->out_proto_thread);
1324 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1325 subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1326 PT_MQTT_WAIT_SEND();
1330 if(ev == mqtt_do_unsubscribe_event) {
1332 DBG(
"MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1334 if(conn->out_buffer_sent == 1 &&
1335 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1336 PT_INIT(&conn->out_proto_thread);
1337 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1338 unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1339 PT_MQTT_WAIT_SEND();
1343 if(ev == mqtt_do_publish_event) {
1345 DBG(
"MQTT - Got mqtt_do_publish_mqtt_event!\n");
1347 if(conn->out_buffer_sent == 1 &&
1348 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1349 PT_INIT(&conn->out_proto_thread);
1350 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1351 publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1352 PT_MQTT_WAIT_SEND();
1363 static uint8_t inited = 0;
1366 mqtt_event_min = mqtt_do_connect_tcp_event;
1376 mqtt_event_max = mqtt_abort_now_event;
1389 uint16_t max_segment_size)
1391 #if MQTT_31 || !MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID 1392 if(strlen(client_id) < 1) {
1393 return MQTT_STATUS_INVALID_ARGS_ERROR;
1398 memset(conn, 0,
sizeof(
struct mqtt_connection));
1399 string_to_mqtt_string(&conn->client_id, client_id);
1400 conn->event_callback = event_callback;
1401 conn->app_process = app_process;
1402 conn->auto_reconnect = 1;
1403 conn->max_segment_size = max_segment_size;
1404 reset_defaults(conn);
1409 DBG(
"MQTT - Registered successfully\n");
1411 return MQTT_STATUS_OK;
1421 uint16_t keep_alive, uint8_t clean_session)
1423 uip_ip6addr_t ip6addr;
1428 if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1429 return MQTT_STATUS_OK;
1432 conn->server_host = host;
1433 conn->keep_alive = keep_alive;
1434 conn->server_port = port;
1435 conn->out_buffer_ptr = conn->out_buffer;
1436 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1439 if(clean_session || (conn->client_id.length == 0)) {
1440 conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1444 if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1445 return MQTT_STATUS_ERROR;
1455 process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1457 return MQTT_STATUS_OK;
1463 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1467 conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1469 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1474 mqtt_qos_level_t qos_level)
1476 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1477 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1480 DBG(
"MQTT - Call to mqtt_subscribe...\n");
1483 if(conn->out_queue_full) {
1484 DBG(
"MQTT - Not accepted!\n");
1485 return MQTT_STATUS_OUT_QUEUE_FULL;
1487 conn->out_queue_full = 1;
1488 DBG(
"MQTT - Accepted!\n");
1490 conn->out_packet.mid = INCREMENT_MID(conn);
1491 conn->out_packet.topic = topic;
1492 conn->out_packet.topic_length = strlen(topic);
1493 conn->out_packet.qos = qos_level;
1494 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1496 process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1497 return MQTT_STATUS_OK;
1503 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1504 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1507 DBG(
"MQTT - Call to mqtt_unsubscribe...\n");
1509 if(conn->out_queue_full) {
1510 DBG(
"MQTT - Not accepted!\n");
1511 return MQTT_STATUS_OUT_QUEUE_FULL;
1513 conn->out_queue_full = 1;
1514 DBG(
"MQTT - Accepted!\n");
1516 conn->out_packet.mid = INCREMENT_MID(conn);
1517 conn->out_packet.topic = topic;
1518 conn->out_packet.topic_length = strlen(topic);
1519 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1521 process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1522 return MQTT_STATUS_OK;
1527 uint8_t *payload, uint32_t payload_size,
1528 mqtt_qos_level_t qos_level, mqtt_retain_t retain)
1530 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1531 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1534 DBG(
"MQTT - Call to mqtt_publish...\n");
1537 if(conn->out_queue_full) {
1538 DBG(
"MQTT - Not accepted!\n");
1539 return MQTT_STATUS_OUT_QUEUE_FULL;
1541 conn->out_queue_full = 1;
1542 DBG(
"MQTT - Accepted!\n");
1544 conn->out_packet.mid = INCREMENT_MID(conn);
1545 conn->out_packet.retain = retain;
1546 conn->out_packet.topic = topic;
1547 conn->out_packet.topic_length = strlen(topic);
1548 conn->out_packet.payload = payload;
1549 conn->out_packet.payload_size = payload_size;
1550 conn->out_packet.qos = qos_level;
1551 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1553 process_post(&mqtt_process, mqtt_do_publish_event, conn);
1554 return MQTT_STATUS_OK;
1562 string_to_mqtt_string(&conn->credentials.username, username);
1563 string_to_mqtt_string(&conn->credentials.password, password);
1566 if(username != NULL) {
1567 conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
1569 conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
1571 if(password != NULL) {
1572 conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
1574 conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
1580 mqtt_qos_level_t qos)
1583 string_to_mqtt_string(&conn->will.topic, topic);
1584 string_to_mqtt_string(&conn->will.message, message);
1587 conn->will.qos = qos;
1590 conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
1591 MQTT_VHDR_WILL_RETAIN_FLAG;
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level)
Subscribes to a MQTT topic.
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
mqtt_event_t
MQTT engine events.
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
#define PROCESS(name, strname)
Declare a process.
Protothreads implementation.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
#define PROCESS_BEGIN()
Define the beginning of a process.
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain)
Publish to a MQTT topic.
#define PROCESS_END()
Define the end of a process.
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
Header file for IPv6-related data structures.
#define PT_INIT(pt)
Initialize a protothread.
#define CLOCK_SECOND
A second, measured in system clock time.
Header file for the callback timer
#define PT_END(pt)
Declare the end of a protothread.
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
Unsubscribes from a MQTT topic.
Linked list manipulation routines.
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
Header file for the Contiki MQTT engine.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
int timer_expired(struct timer *t)
Check if a timer has expired.
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
#define PT_EXIT(pt)
Exit the protothread.
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
#define PT_THREAD(name_args)
Declaration of a protothread.
process_event_t process_alloc_event(void)
Allocate a global event number.
void list_add(list_t list, void *item)
Add an item at the end of a list.
void list_init(list_t list)
Initialize a list.
Header file for the uIP TCP/IP stack.
#define LIST(name)
Declare a linked list.
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
void mqtt_disconnect(struct mqtt_connection *conn)
Disconnects from a MQTT broker.
Default definitions of C compiler quirk work-arounds.
PROCESS_THREAD(cc2538_rf_process, ev, data)
Implementation of the cc2538 RF driver process.
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos)
Set the last will topic and message for a MQTT client.
Header file for the LED HAL.
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive, uint8_t clean_session)
Connects to a MQTT broker.
void process_start(struct process *p, process_data_t data)
Start a process.