44 #include "mqtt-prop.h" 46 #include "contiki-net.h" 47 #include "contiki-lib.h" 48 #include "lib/random.h" 56 #include "tcp-socket.h" 58 #include "lib/assert.h" 69 #define PRINTF(...) printf(__VA_ARGS__) 75 MQTT_FHDR_DUP_FLAG = 0x08,
77 MQTT_FHDR_QOS_LEVEL_0 = 0x00,
78 MQTT_FHDR_QOS_LEVEL_1 = 0x02,
79 MQTT_FHDR_QOS_LEVEL_2 = 0x04,
81 MQTT_FHDR_RETAIN_FLAG = 0x01,
85 MQTT_VHDR_USERNAME_FLAG = 0x80,
86 MQTT_VHDR_PASSWORD_FLAG = 0x40,
88 MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
89 MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
90 MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
91 MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
93 MQTT_VHDR_WILL_FLAG = 0x04,
94 MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
95 } mqtt_vhdr_conn_fields_t;
98 MQTT_VHDR_CONN_ACCEPTED,
99 MQTT_VHDR_CONN_REJECTED_PROTOCOL,
100 MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
101 MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
102 MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
103 MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
104 } mqtt_vhdr_connack_ret_code_t;
107 MQTT_VHDR_CONNACK_SESSION_PRESENT = 0x1
108 } mqtt_vhdr_connack_flags_t;
112 MQTT_SUBACK_RET_QOS_0 = 0x00,
113 MQTT_SUBACK_RET_QOS_1 = 0x01,
114 MQTT_SUBACK_RET_QOS_2 = 0x02,
115 MQTT_SUBACK_RET_FAIL = 0x08,
116 } mqtt_suback_ret_code_t;
120 MQTT_VHDR_RC_SUCCES_OR_NORMAL = 0x00,
121 MQTT_VHDR_RC_QOS_0 = 0x01,
122 MQTT_VHDR_RC_QOS_1 = 0x02,
123 MQTT_VHDR_RC_QOS_2 = 0x03,
124 MQTT_VHDR_RC_DISC_WITH_WILL = 0x04,
125 MQTT_VHDR_RC_NO_MATCH_SUB = 0x10,
126 MQTT_VHDR_RC_NO_SUB_EXISTED = 0x11,
127 MQTT_VHDR_RC_CONTINUE_AUTH = 0x18,
128 MQTT_VHDR_RC_REAUTH = 0x19,
129 MQTT_VHDR_RC_UNSPEC_ERR = 0x80,
130 MQTT_VHDR_RC_MALFORMED_PKT = 0x81,
131 MQTT_VHDR_RC_PROTOCOL_ERR = 0x82,
132 MQTT_VHDR_RC_IMPL_SPEC_ERR = 0x83,
133 MQTT_VHDR_RC_PROT_VER_UNUSUPPORTED = 0x84,
134 MQTT_VHDR_RC_CLIENT_ID_INVALID = 0x85,
135 MQTT_VHDR_RC_BAD_USER_PASS = 0x86,
136 MQTT_VHDR_RC_NOT_AUTH = 0x87,
137 MQTT_VHDR_RC_SRV_UNAVAIL = 0x88,
138 MQTT_VHDR_RC_SRV_BUSY = 0x89,
139 MQTT_VHDR_RC_BANNED = 0x8A,
140 MQTT_VHDR_RC_SRV_SHUTDOWN = 0x8B,
141 MQTT_VHDR_RC_BAD_AUTH_METHOD = 0x8C,
142 MQTT_VHDR_RC_KEEP_ALIVE_TIMEOUT = 0x8D,
143 MQTT_VHDR_RC_SESS_TAKEN_OVER = 0x8E,
144 MQTT_VHDR_RC_TOPIC_FILT_INVAL = 0x8F,
145 MQTT_VHDR_RC_TOPIC_NAME_INVAL = 0x90,
146 MQTT_VHDR_RC_PKT_ID_IN_USE = 0x91,
147 MQTT_VHDR_RC_PKT_ID_NOT_FOUND = 0x92,
148 MQTT_VHDR_RC_RECV_MAX_EXCEEDED = 0x93,
149 MQTT_VHDR_RC_TOPIC_ALIAS_INVAL = 0x94,
150 MQTT_VHDR_RC_PKT_TOO_LARGE = 0x95,
151 MQTT_VHDR_RC_MSG_RATE_TOO_HIGH = 0x96,
152 MQTT_VHDR_RC_QUOTA_EXCEEDED = 0x97,
153 MQTT_VHDR_RC_ADMIN_ACTION = 0x98,
154 MQTT_VHDR_RC_PAYLD_FMT_INVAL = 0x99,
155 MQTT_VHDR_RC_RETAIN_UNSUPPORTED = 0x9A,
156 MQTT_VHDR_RC_QOS_UNSUPPORTED = 0x9B,
157 MQTT_VHDR_RC_USE_ANOTHER_SRV = 0x9C,
158 MQTT_VHDR_RC_SRV_MOVED = 0x9D,
159 MQTT_VHDR_RC_SHARED_SUB_UNSUPPORTED = 0x9E,
160 MQTT_VHDR_RC_CONN_RATE_EXCEEDED = 0x9F,
161 MQTT_VHDR_RC_MAX_CONN_TIME = 0xA0,
162 MQTT_VHDR_RC_SUB_ID_UNSUPPORTED = 0xA1,
163 MQTT_VHDR_RC_WILD_SUB_UNSUPPORTED = 0xA2,
164 } mqtt_reason_code_t;
166 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10) 168 #define INCREMENT_MID(conn) (conn)->mid_counter += 2 169 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length)) 172 #define PT_MQTT_WRITE_BYTES(conn, data, len) \ 173 conn->out_write_pos = 0; \ 174 while(write_bytes(conn, data, len)) { \ 175 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ 178 #define PT_MQTT_WRITE_BYTE(conn, data) \ 179 while(write_byte(conn, data)) { \ 180 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ 189 #define PT_MQTT_WAIT_SEND() \ 191 if (PROCESS_ERR_OK == \ 192 process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \ 194 PROCESS_WAIT_EVENT(); \ 195 if(ev == mqtt_abort_now_event) { \ 196 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ 197 PT_INIT(&conn->out_proto_thread); \ 198 process_post(PROCESS_CURRENT(), ev, data); \ 199 } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ 200 process_post(PROCESS_CURRENT(), ev, data); \ 202 } while (ev != mqtt_continue_send_event); \ 206 static process_event_t mqtt_do_connect_tcp_event;
207 static process_event_t mqtt_do_connect_mqtt_event;
208 static process_event_t mqtt_do_disconnect_mqtt_event;
209 static process_event_t mqtt_do_subscribe_event;
210 static process_event_t mqtt_do_unsubscribe_event;
211 static process_event_t mqtt_do_publish_event;
212 static process_event_t mqtt_do_pingreq_event;
213 static process_event_t mqtt_continue_send_event;
214 static process_event_t mqtt_abort_now_event;
215 static process_event_t mqtt_do_auth_event;
216 process_event_t mqtt_update_event;
223 static process_event_t mqtt_event_min;
224 static process_event_t mqtt_event_max;
228 tcp_input(
struct tcp_socket *s,
void *ptr,
const uint8_t *input_data_ptr,
231 static void tcp_event(
struct tcp_socket *s,
void *ptr,
232 tcp_socket_event_t event);
234 static void reset_packet(
struct mqtt_in_packet *packet);
236 LIST(mqtt_conn_list);
238 PROCESS(mqtt_process,
"MQTT process");
241 call_event(
struct mqtt_connection *conn,
245 conn->event_callback(conn, event, data);
246 process_post(conn->app_process, mqtt_update_event, NULL);
250 reset_defaults(
struct mqtt_connection *conn)
252 conn->mid_counter = 1;
253 PT_INIT(&conn->out_proto_thread);
254 conn->waiting_for_pingresp = 0;
256 reset_packet(&conn->in_packet);
257 conn->out_buffer_sent = 0;
261 abort_connection(
struct mqtt_connection *conn)
263 conn->out_buffer_ptr = conn->out_buffer;
264 conn->out_queue_full = 0;
267 memset(&conn->out_packet, 0,
sizeof(conn->out_packet));
269 tcp_socket_close(&conn->socket);
270 tcp_socket_unregister(&conn->socket);
272 memset(&conn->socket, 0,
sizeof(conn->socket));
274 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
278 connect_tcp(
struct mqtt_connection *conn)
280 conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
282 reset_defaults(conn);
283 tcp_socket_register(&(conn->socket),
286 MQTT_TCP_INPUT_BUFF_SIZE,
288 MQTT_TCP_OUTPUT_BUFF_SIZE,
291 tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
295 disconnect_tcp(
struct mqtt_connection *conn)
297 conn->state = MQTT_CONN_STATE_DISCONNECTING;
298 tcp_socket_close(&(conn->socket));
299 tcp_socket_unregister(&conn->socket);
301 memset(&conn->socket, 0,
sizeof(conn->socket));
305 send_out_buffer(
struct mqtt_connection *conn)
307 if(conn->out_buffer_ptr - conn->out_buffer == 0) {
308 conn->out_buffer_sent = 1;
311 conn->out_buffer_sent = 0;
313 DBG(
"MQTT - (send_out_buffer) Space used in buffer: %i\n",
314 conn->out_buffer_ptr - conn->out_buffer);
316 tcp_socket_send(&conn->socket, conn->out_buffer,
317 conn->out_buffer_ptr - conn->out_buffer);
321 string_to_mqtt_string(
struct mqtt_string *mqtt_string,
char *
string)
323 if(mqtt_string == NULL) {
326 mqtt_string->string = string;
329 mqtt_string->length = strlen(
string);
331 mqtt_string->length = 0;
336 write_byte(
struct mqtt_connection *conn, uint8_t data)
338 DBG(
"MQTT - (write_byte) buff_size: %i write: '%02X'\n",
339 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
342 if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
343 send_out_buffer(conn);
347 *conn->out_buffer_ptr = data;
348 conn->out_buffer_ptr++;
353 write_bytes(
struct mqtt_connection *conn, uint8_t *data, uint16_t len)
355 uint16_t write_bytes;
357 MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
358 len - conn->out_write_pos);
360 memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
361 conn->out_write_pos += write_bytes;
362 conn->out_buffer_ptr += write_bytes;
364 DBG(
"MQTT - (write_bytes) len: %u write_pos: %i\n", len,
365 conn->out_write_pos);
367 if(len - conn->out_write_pos == 0) {
368 conn->out_write_pos = 0;
371 send_out_buffer(conn);
372 return len - conn->out_write_pos;
377 mqtt_decode_var_byte_int(
const uint8_t *input_data_ptr,
380 uint32_t *pkt_byte_count,
383 uint8_t read_bytes = 0;
385 uint8_t multiplier = 1;
386 uint32_t input_pos_0 = 0;
388 if(input_pos == NULL) {
389 input_pos = &input_pos_0;
393 if(*input_pos >= input_data_len) {
397 byte_in = input_data_ptr[*input_pos];
403 DBG(
"MQTT - Read Variable Byte Integer byte %i\n", byte_in);
406 DBG(
"Received more than 4 byte 'Variable Byte Integer'.");
410 *dest += (byte_in & 127) * multiplier;
412 }
while((byte_in & 128) != 0);
418 mqtt_encode_var_byte_int(uint8_t *vbi_out,
424 DBG(
"MQTT - Encoding Variable Byte Integer %u\n", val);
431 digit = digit | 0x80;
434 vbi_out[*vbi_bytes] = digit;
436 DBG(
"MQTT - Encode VBI digit '%u' length '%i'\n", digit, val);
437 }
while(val > 0 && *vbi_bytes < 5);
438 DBG(
"MQTT - var_byte_int bytes %u\n", *vbi_bytes);
442 keep_alive_callback(
void *ptr)
444 struct mqtt_connection *conn = ptr;
446 DBG(
"MQTT - (keep_alive_callback) Called!\n");
449 if(conn->waiting_for_pingresp) {
450 PRINTF(
"MQTT - Disconnect due to no PINGRESP from broker.\n");
451 disconnect_tcp(conn);
455 process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
459 reset_packet(
struct mqtt_in_packet *packet)
461 memset(packet, 0,
sizeof(
struct mqtt_in_packet));
466 PT_THREAD(write_out_props(
struct pt *pt,
struct mqtt_connection *conn,
467 struct mqtt_prop_list *prop_list))
471 static struct mqtt_prop_out_property *prop;
474 DBG(
"MQTT - Writing %i property bytes\n", prop_list->properties_len + prop_list->properties_len_enc_bytes);
476 PT_MQTT_WRITE_BYTES(conn,
477 prop_list->properties_len_enc,
478 prop_list->properties_len_enc_bytes);
480 prop = (
struct mqtt_prop_out_property *)
list_head(prop_list->props);
483 DBG(
"MQTT - Property ID %i len %i\n", prop->id, prop->property_len);
484 PT_MQTT_WRITE_BYTE(conn, prop->id);
485 PT_MQTT_WRITE_BYTES(conn,
490 }
while(prop != NULL);
493 DBG(
"MQTT - No properties to write\n");
494 PT_MQTT_WRITE_BYTE(conn, 0);
502 PT_THREAD(connect_pt(
struct pt *pt,
struct mqtt_connection *conn))
507 static struct mqtt_prop_list *will_props = MQTT_PROP_LIST_NONE;
508 if(conn->will.properties) {
509 will_props = (
struct mqtt_prop_list *)
list_head(conn->will.properties);
513 DBG(
"MQTT - Sending CONNECT message...\n");
516 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
517 conn->out_packet.remaining_length = 0;
518 conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_SIZE;
519 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
520 #if (MQTT_PROTOCOL_VERSION > MQTT_PROTOCOL_VERSION_3_1) && MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID 522 if(MQTT_STRING_LENGTH(&conn->client_id) == 0) {
523 conn->out_packet.remaining_length += 2;
526 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
527 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
528 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
529 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
533 conn->out_packet.remaining_length +=
534 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
538 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
539 conn->out_packet.remaining_length +=
540 will_props ? will_props->properties_len + will_props->properties_len_enc_bytes
545 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
546 &conn->out_packet.remaining_length_enc_bytes,
547 conn->out_packet.remaining_length);
548 if(conn->out_packet.remaining_length_enc_bytes > 4) {
549 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
550 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
555 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
556 PT_MQTT_WRITE_BYTES(conn,
557 conn->out_packet.remaining_length_enc,
558 conn->out_packet.remaining_length_enc_bytes);
559 PT_MQTT_WRITE_BYTE(conn, 0);
560 PT_MQTT_WRITE_BYTE(conn, strlen(MQTT_PROTOCOL_NAME));
561 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, strlen(MQTT_PROTOCOL_NAME));
562 PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
563 PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
564 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
565 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
569 write_out_props(pt, conn, conn->out_props);
573 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length >> 8);
574 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
575 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
576 conn->client_id.length);
578 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
581 DBG(
"MQTT - Writing will properties\n");
582 write_out_props(pt, conn, will_props);
584 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length >> 8);
585 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
586 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
587 conn->will.topic.length);
588 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length >> 8);
589 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
590 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
591 conn->will.message.length);
592 DBG(
"MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
593 conn->will.topic.string,
594 conn->will.topic.length,
595 conn->will.message.string,
596 conn->will.message.length);
598 if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
599 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length >> 8);
600 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
601 PT_MQTT_WRITE_BYTES(conn,
602 (uint8_t *)conn->credentials.username.string,
603 conn->credentials.username.length);
605 if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
606 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length >> 8);
607 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
608 PT_MQTT_WRITE_BYTES(conn,
609 (uint8_t *)conn->credentials.password.string,
610 conn->credentials.password.length);
614 send_out_buffer(conn);
615 conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
617 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
620 reset_packet(&conn->in_packet);
621 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
624 DBG(
"Timeout waiting for CONNACK\n");
632 reset_packet(&conn->in_packet);
634 DBG(
"MQTT - Done sending CONNECT\n");
637 DBG(
"MQTT - CONNECT message sent: \n");
639 for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
640 DBG(
"%02X ", conn->out_buffer[i]);
649 PT_THREAD(disconnect_pt(
struct pt *pt,
struct mqtt_connection *conn))
653 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
654 PT_MQTT_WRITE_BYTE(conn, 0);
658 write_out_props(pt, conn, conn->out_props);
661 send_out_buffer(conn);
675 PT_THREAD(subscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
679 DBG(
"MQTT - Sending subscribe message! topic %s topic_length %i\n",
680 conn->out_packet.topic,
681 conn->out_packet.topic_length);
682 DBG(
"MQTT - Buffer space is %i \n",
683 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
686 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
687 conn->out_packet.remaining_length = MQTT_MID_SIZE +
688 MQTT_STRING_LEN_SIZE +
689 conn->out_packet.topic_length +
693 conn->out_packet.remaining_length +=
694 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
698 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
699 &conn->out_packet.remaining_length_enc_bytes,
700 conn->out_packet.remaining_length);
701 if(conn->out_packet.remaining_length_enc_bytes > 4) {
702 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
703 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
708 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
709 PT_MQTT_WRITE_BYTES(conn,
710 conn->out_packet.remaining_length_enc,
711 conn->out_packet.remaining_length_enc_bytes);
713 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
714 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
718 write_out_props(pt, conn, conn->out_props);
722 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
723 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
724 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
725 conn->out_packet.topic_length);
728 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.sub_options);
730 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
734 send_out_buffer(conn);
735 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
738 reset_packet(&conn->in_packet);
739 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
743 DBG(
"Timeout waiting for SUBACK\n");
745 reset_packet(&conn->in_packet);
748 conn->out_queue_full = 0;
750 DBG(
"MQTT - Done in send_subscribe!\n");
756 PT_THREAD(unsubscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
760 DBG(
"MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
761 conn->out_packet.topic,
762 conn->out_packet.topic_length);
763 DBG(
"MQTT - Buffer space is %i \n",
764 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
767 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
768 MQTT_FHDR_QOS_LEVEL_1;
769 conn->out_packet.remaining_length = MQTT_MID_SIZE +
770 MQTT_STRING_LEN_SIZE +
771 conn->out_packet.topic_length;
774 conn->out_packet.remaining_length +=
775 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
779 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
780 &conn->out_packet.remaining_length_enc_bytes,
781 conn->out_packet.remaining_length);
782 if(conn->out_packet.remaining_length_enc_bytes > 4) {
783 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
784 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
789 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
790 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
791 conn->out_packet.remaining_length_enc_bytes);
794 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
795 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
798 write_out_props(pt, conn, conn->out_props);
802 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
803 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
804 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
805 conn->out_packet.topic_length);
808 send_out_buffer(conn);
809 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
812 reset_packet(&conn->in_packet);
813 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
817 DBG(
"Timeout waiting for UNSUBACK\n");
820 reset_packet(&conn->in_packet);
823 conn->out_queue_full = 0;
825 DBG(
"MQTT - Done writing subscribe message to out buffer!\n");
831 PT_THREAD(publish_pt(
struct pt *pt,
struct mqtt_connection *conn))
835 DBG(
"MQTT - Sending publish message! topic %s topic_length %i\n",
836 conn->out_packet.topic,
837 conn->out_packet.topic_length);
838 DBG(
"MQTT - Buffer space is %i \n",
839 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
842 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
843 conn->out_packet.qos << 1;
844 if(conn->out_packet.retain == MQTT_RETAIN_ON) {
845 conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
847 conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
848 conn->out_packet.topic_length +
849 conn->out_packet.payload_size;
850 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
851 conn->out_packet.remaining_length += MQTT_MID_SIZE;
855 conn->out_packet.remaining_length +=
856 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
860 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
861 &conn->out_packet.remaining_length_enc_bytes,
862 conn->out_packet.remaining_length);
863 if(conn->out_packet.remaining_length_enc_bytes > 4) {
864 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
865 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
870 if(conn->out_packet.qos == MQTT_QOS_LEVEL_0) {
871 conn->out_packet.fhdr &= ~MQTT_FHDR_DUP_FLAG;
875 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
876 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
877 conn->out_packet.remaining_length_enc_bytes);
879 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
880 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
881 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
882 conn->out_packet.topic_length);
883 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
884 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
885 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
890 write_out_props(pt, conn, conn->out_props);
894 PT_MQTT_WRITE_BYTES(conn,
895 conn->out_packet.payload,
896 conn->out_packet.payload_size);
898 send_out_buffer(conn);
899 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
907 if(conn->out_packet.qos == 0) {
908 process_post(conn->app_process, mqtt_update_event, NULL);
909 }
else if(conn->out_packet.qos == 1) {
911 reset_packet(&conn->in_packet);
912 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
915 DBG(
"Timeout waiting for PUBACK\n");
917 if(conn->in_packet.mid != conn->out_packet.mid) {
918 DBG(
"MQTT - Warning, got PUBACK with none matching MID. Currently there " 919 "is no support for several concurrent PUBLISH messages.\n");
921 }
else if(conn->out_packet.qos == 2) {
922 DBG(
"MQTT - QoS not implemented yet.\n");
926 reset_packet(&conn->in_packet);
929 conn->out_queue_full = 0;
931 DBG(
"MQTT - Publish Enqueued\n");
937 PT_THREAD(pingreq_pt(
struct pt *pt,
struct mqtt_connection *conn))
941 DBG(
"MQTT - Sending PINGREQ\n");
944 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
945 PT_MQTT_WRITE_BYTE(conn, 0);
947 send_out_buffer(conn);
950 conn->waiting_for_pingresp = 1;
953 reset_packet(&conn->in_packet);
954 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
958 reset_packet(&conn->in_packet);
960 conn->waiting_for_pingresp = 0;
967 PT_THREAD(auth_pt(
struct pt *pt,
struct mqtt_connection *conn))
971 conn->out_packet.remaining_length +=
972 conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
975 mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
976 &conn->out_packet.remaining_length_enc_bytes,
977 conn->out_packet.remaining_length);
979 if(conn->out_packet.remaining_length_enc_bytes > 4) {
980 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
981 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
986 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
987 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
988 conn->out_packet.remaining_length_enc_bytes);
991 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.auth_reason_code);
994 write_out_props(pt, conn, conn->out_props);
997 send_out_buffer(conn);
1006 handle_connack(
struct mqtt_connection *conn)
1008 struct mqtt_connack_event connack_event;
1010 DBG(
"MQTT - Got CONNACK\n");
1012 #if MQTT_PROTOCOL_VERSION <= MQTT_PROTOCOL_VERSION_3_1_1 1013 if(conn->in_packet.remaining_length != 2) {
1014 PRINTF(
"MQTT - CONNACK VHDR remaining length %i incorrect\n",
1015 conn->in_packet.remaining_length);
1019 abort_connection(conn);
1023 if(conn->in_packet.payload[1] != 0) {
1024 PRINTF(
"MQTT - Connection refused with Return Code %i\n",
1025 conn->in_packet.payload[1]);
1027 MQTT_EVENT_CONNECTION_REFUSED_ERROR,
1028 &conn->in_packet.payload[1]);
1029 abort_connection(conn);
1034 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1036 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1 1037 connack_event.session_present = conn->in_packet.payload[0] & MQTT_VHDR_CONNACK_SESSION_PRESENT;
1040 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5 1046 if(conn->in_packet.remaining_length < 3) {
1047 PRINTF(
"MQTT - CONNACK VHDR remaining length %i incorrect\n",
1048 conn->in_packet.remaining_length);
1052 abort_connection(conn);
1055 mqtt_prop_parse_connack_props(conn);
1059 keep_alive_callback, conn);
1062 conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
1063 call_event(conn, MQTT_EVENT_CONNECTED, &connack_event);
1067 handle_pingresp(
struct mqtt_connection *conn)
1069 DBG(
"MQTT - Got PINGRESP\n");
1073 handle_suback(
struct mqtt_connection *conn)
1075 struct mqtt_suback_event suback_event;
1077 DBG(
"MQTT - Got SUBACK\n");
1081 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1082 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE +
1083 conn->in_packet.properties_len + conn->in_packet.properties_enc_len) {
1085 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1086 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
1088 DBG(
"MQTT - Error, SUBACK with > 1 topic, not supported.\n");
1091 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1093 suback_event.mid = conn->in_packet.mid;
1096 suback_event.success = 0;
1098 switch(conn->in_packet.payload_start[0]) {
1099 case MQTT_SUBACK_RET_FAIL:
1100 PRINTF(
"MQTT - Error, SUBSCRIBE failed with SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1103 case MQTT_SUBACK_RET_QOS_0:
1104 case MQTT_SUBACK_RET_QOS_1:
1105 case MQTT_SUBACK_RET_QOS_2:
1106 suback_event.qos_level = conn->in_packet.payload_start[0] & 0x03;
1107 suback_event.success = 1;
1111 PRINTF(
"MQTT - Error, Unrecognised SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1115 suback_event.return_code = conn->in_packet.payload_start[0];
1117 suback_event.qos_level = conn->in_packet.payload_start[0];
1120 if(conn->in_packet.mid != conn->out_packet.mid) {
1121 DBG(
"MQTT - Warning, got SUBACK with none matching MID. Currently there is" 1122 "no support for several concurrent SUBSCRIBE messages.\n");
1126 call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
1130 handle_unsuback(
struct mqtt_connection *conn)
1132 DBG(
"MQTT - Got UNSUBACK\n");
1134 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1136 if(conn->in_packet.mid != conn->out_packet.mid) {
1137 DBG(
"MQTT - Warning, got UNSUBACK with none matching MID. Currently there is" 1138 "no support for several concurrent UNSUBSCRIBE messages.\n");
1141 call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
1145 handle_puback(
struct mqtt_connection *conn)
1147 DBG(
"MQTT - Got PUBACK\n");
1149 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1151 call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
1154 static mqtt_pub_status_t
1155 handle_publish(
struct mqtt_connection *conn)
1157 DBG(
"MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
1158 DBG(
"MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
1160 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1 1161 if(strlen(conn->in_publish_msg.topic) < conn->in_packet.topic_len) {
1162 DBG(
"NULL detected in received PUBLISH topic\n");
1168 return MQTT_PUBLISH_ERR;
1172 DBG(
"MQTT - This chunk is %i bytes\n", conn->in_publish_msg.payload_chunk_length);
1174 if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
1175 PRINTF(
"MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
1178 call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
1180 if(conn->in_publish_msg.first_chunk == 1) {
1181 conn->in_publish_msg.first_chunk = 0;
1185 if(conn->in_publish_msg.payload_left == 0) {
1190 DBG(
"MQTT - (handle_publish) resetting packet.\n");
1191 reset_packet(&conn->in_packet);
1194 return MQTT_PUBLISH_OK;
1198 parse_publish_vhdr(
struct mqtt_connection *conn,
1200 const uint8_t *input_data_ptr,
1203 uint16_t copy_bytes;
1206 if(conn->in_packet.topic_len_received == 0) {
1207 conn->in_packet.topic_pos = 0;
1208 conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
1209 conn->in_packet.byte_counter++;
1210 if(*pos >= input_data_len) {
1213 conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
1214 conn->in_packet.byte_counter++;
1215 conn->in_packet.topic_len_received = 1;
1217 if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
1218 DBG(
"MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
1221 DBG(
"MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
1226 if(conn->in_packet.topic_len_received == 1 &&
1227 conn->in_packet.topic_received == 0) {
1228 copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
1229 input_data_len - *pos);
1230 DBG(
"MQTT - topic_pos: %i copy_bytes: %i\n", conn->in_packet.topic_pos,
1232 memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
1233 &input_data_ptr[*pos],
1235 (*pos) += copy_bytes;
1236 conn->in_packet.byte_counter += copy_bytes;
1237 conn->in_packet.topic_pos += copy_bytes;
1239 if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
1240 DBG(
"MQTT - Got topic '%s'", conn->in_publish_msg.topic);
1241 conn->in_packet.topic_received = 1;
1242 conn->in_publish_msg.topic[conn->in_packet.topic_pos] =
'\0';
1243 conn->in_publish_msg.payload_length =
1244 conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
1245 conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
1249 conn->in_publish_msg.first_chunk = 1;
1256 handle_disconnect(
struct mqtt_connection *conn)
1258 DBG(
"MQTT - (handle_disconnect) Got DISCONNECT.\n");
1259 call_event(conn, MQTT_EVENT_DISCONNECTED, NULL);
1260 abort_connection(conn);
1264 handle_auth(
struct mqtt_connection *conn)
1266 struct mqtt_prop_auth_event event;
1268 DBG(
"MQTT - (handle_auth) Got AUTH.\n");
1270 if((conn->in_packet.fhdr & 0x0F) != 0x0) {
1274 abort_connection(conn);
1279 if(conn->state == MQTT_CONN_STATE_CONNECTING_TO_BROKER &&
1280 (!conn->in_packet.has_reason_code ||
1281 conn->in_packet.reason_code != MQTT_VHDR_RC_CONTINUE_AUTH)) {
1282 DBG(
"MQTT - (handle_auth) Not reauth - Reason Code 0x18 expected!\n");
1285 mqtt_prop_parse_auth_props(conn, &event);
1286 call_event(conn, MQTT_EVENT_AUTH, &event);
1291 parse_vhdr(
struct mqtt_connection *conn)
1293 conn->in_packet.payload_start = conn->in_packet.payload;
1296 switch(conn->in_packet.fhdr & 0xF0) {
1297 case MQTT_FHDR_MSG_TYPE_PUBACK:
1298 case MQTT_FHDR_MSG_TYPE_SUBACK:
1299 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1300 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
1301 (conn->in_packet.payload[1]);
1302 conn->in_packet.payload_start += 2;
1316 switch(conn->in_packet.fhdr & 0xF0) {
1317 case MQTT_FHDR_MSG_TYPE_CONNACK:
1318 case MQTT_FHDR_MSG_TYPE_PUBACK:
1319 case MQTT_FHDR_MSG_TYPE_PUBREC:
1320 case MQTT_FHDR_MSG_TYPE_PUBREL:
1321 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1322 case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1323 case MQTT_FHDR_MSG_TYPE_AUTH:
1324 conn->in_packet.reason_code = conn->in_packet.payload_start[0];
1325 conn->in_packet.has_reason_code = 1;
1326 conn->in_packet.payload_start += 1;
1330 conn->in_packet.has_reason_code = 0;
1334 if(!conn->in_packet.has_props) {
1335 mqtt_prop_decode_input_props(conn);
1341 tcp_input(
struct tcp_socket *s,
1343 const uint8_t *input_data_ptr,
1346 struct mqtt_connection *conn = ptr;
1348 uint32_t copy_bytes = 0;
1349 mqtt_pub_status_t pub_status;
1350 uint8_t remaining_length_bytes;
1352 if(input_data_len == 0) {
1356 if(conn->in_packet.packet_received) {
1357 reset_packet(&conn->in_packet);
1360 DBG(
"tcp_input with %i bytes of data:\n", input_data_len);
1363 if(!conn->in_packet.fhdr) {
1364 conn->in_packet.fhdr = input_data_ptr[pos++];
1365 conn->in_packet.byte_counter++;
1367 DBG(
"MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
1369 if(pos >= input_data_len) {
1375 if(!conn->in_packet.has_remaining_length) {
1376 remaining_length_bytes =
1377 mqtt_decode_var_byte_int(input_data_ptr, input_data_len, &pos,
1378 &conn->in_packet.byte_counter,
1379 &conn->in_packet.remaining_length);
1381 if(remaining_length_bytes == 0) {
1382 call_event(conn, MQTT_EVENT_ERROR, NULL);
1386 DBG(
"MQTT - Finished reading remaining length byte\n");
1387 conn->in_packet.has_remaining_length = 1;
1396 if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
1397 (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
1399 PRINTF(
"MQTT - Error, unsupported payload size for non-PUBLISH message\n");
1401 conn->in_packet.byte_counter += input_data_len;
1402 if(conn->in_packet.byte_counter >=
1403 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1404 conn->in_packet.packet_received = 1;
1415 while(conn->in_packet.byte_counter <
1416 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1418 if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1419 conn->in_packet.topic_received == 0) {
1420 parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1424 copy_bytes = MIN(input_data_len - pos,
1425 MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1426 DBG(
"- Copied %i payload bytes\n", copy_bytes);
1427 memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1428 &input_data_ptr[pos],
1430 conn->in_packet.byte_counter += copy_bytes;
1431 conn->in_packet.payload_pos += copy_bytes;
1436 DBG(
"MQTT - Copied bytes: \n");
1437 for(i = 0; i < copy_bytes; i++) {
1438 DBG(
"%02X ", conn->in_packet.payload[i]);
1444 if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1445 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1446 conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1447 conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1450 if(!conn->in_packet.has_props) {
1451 mqtt_prop_decode_input_props(conn);
1454 if(conn->in_publish_msg.first_chunk) {
1455 conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1456 conn->in_packet.properties_enc_len;
1459 conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1460 conn->in_packet.properties_enc_len;
1464 pub_status = handle_publish(conn);
1466 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1467 conn->in_packet.payload_pos = 0;
1469 if(pub_status != MQTT_PUBLISH_OK) {
1474 if(pos >= input_data_len &&
1475 (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1485 DBG(
"MQTT - Finished reading packet!\n");
1487 DBG(
"MQTT - total data was %i bytes of data. \n",
1488 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1491 if(conn->in_packet.has_reason_code &&
1492 conn->in_packet.reason_code >= MQTT_VHDR_RC_UNSPEC_ERR) {
1493 PRINTF(
"MQTT - Reason Code indicated error %i\n",
1494 conn->in_packet.reason_code);
1498 abort_connection(conn);
1504 switch(conn->in_packet.fhdr & 0xF0) {
1505 case MQTT_FHDR_MSG_TYPE_CONNACK:
1506 handle_connack(conn);
1508 case MQTT_FHDR_MSG_TYPE_PUBLISH:
1510 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1511 conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1512 conn->in_publish_msg.payload_left = 0;
1514 DBG(
"MQTT - First chunk? %i\n", conn->in_publish_msg.first_chunk);
1516 if(conn->in_publish_msg.first_chunk) {
1517 conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1518 conn->in_packet.properties_enc_len;
1520 conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1521 conn->in_packet.properties_enc_len;
1524 (void)handle_publish(conn);
1526 case MQTT_FHDR_MSG_TYPE_PUBACK:
1527 handle_puback(conn);
1529 case MQTT_FHDR_MSG_TYPE_SUBACK:
1530 handle_suback(conn);
1532 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1533 handle_unsuback(conn);
1535 case MQTT_FHDR_MSG_TYPE_PINGRESP:
1536 handle_pingresp(conn);
1540 case MQTT_FHDR_MSG_TYPE_PUBREC:
1541 case MQTT_FHDR_MSG_TYPE_PUBREL:
1542 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1543 call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1544 PRINTF(
"MQTT - Got unhandled MQTT Message Type '%i'",
1545 (conn->in_packet.fhdr & 0xF0));
1548 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5 1549 case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1550 handle_disconnect(conn);
1553 case MQTT_FHDR_MSG_TYPE_AUTH:
1560 PRINTF(
"MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1564 conn->in_packet.packet_received = 1;
1573 tcp_event(
struct tcp_socket *s,
void *ptr, tcp_socket_event_t event)
1575 struct mqtt_connection *conn = ptr;
1581 case TCP_SOCKET_CLOSED:
1582 case TCP_SOCKET_TIMEDOUT:
1583 case TCP_SOCKET_ABORTED: {
1585 DBG(
"MQTT - Disconnected by tcp event %d\n", event);
1586 process_post(&mqtt_process, mqtt_abort_now_event, conn);
1587 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1589 call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1590 abort_connection(conn);
1593 if(conn->auto_reconnect == 1) {
1598 case TCP_SOCKET_CONNECTED: {
1599 conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1600 conn->out_buffer_sent = 1;
1602 process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1605 case TCP_SOCKET_DATA_SENT: {
1606 DBG(
"MQTT - Got TCP_DATA_SENT\n");
1608 if(conn->socket.output_data_len == 0) {
1609 conn->out_buffer_sent = 1;
1610 conn->out_buffer_ptr = conn->out_buffer;
1618 DBG(
"MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1626 static struct mqtt_connection *conn;
1633 if(ev == mqtt_abort_now_event) {
1634 DBG(
"MQTT - Abort\n");
1636 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1638 abort_connection(conn);
1640 if(ev == mqtt_do_connect_tcp_event) {
1642 DBG(
"MQTT - Got mqtt_do_connect_tcp_event!\n");
1645 if(ev == mqtt_do_connect_mqtt_event) {
1647 conn->socket.output_data_max_seg = conn->max_segment_size;
1648 DBG(
"MQTT - Got mqtt_do_connect_mqtt_event!\n");
1650 if(conn->out_buffer_sent == 1) {
1651 PT_INIT(&conn->out_proto_thread);
1652 while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1653 conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1654 PT_MQTT_WAIT_SEND();
1658 if(ev == mqtt_do_disconnect_mqtt_event) {
1660 DBG(
"MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1663 if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1664 if(conn->out_buffer_sent == 1) {
1665 PT_INIT(&conn->out_proto_thread);
1666 while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1667 disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1668 PT_MQTT_WAIT_SEND();
1670 abort_connection(conn);
1671 call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1673 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1677 if(ev == mqtt_do_pingreq_event) {
1679 DBG(
"MQTT - Got mqtt_do_pingreq_event!\n");
1681 if(conn->out_buffer_sent == 1 &&
1682 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1683 PT_INIT(&conn->out_proto_thread);
1684 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1685 pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1686 PT_MQTT_WAIT_SEND();
1690 if(ev == mqtt_do_subscribe_event) {
1692 DBG(
"MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1694 if(conn->out_buffer_sent == 1 &&
1695 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1696 PT_INIT(&conn->out_proto_thread);
1697 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1698 subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1699 PT_MQTT_WAIT_SEND();
1703 if(ev == mqtt_do_unsubscribe_event) {
1705 DBG(
"MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1707 if(conn->out_buffer_sent == 1 &&
1708 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1709 PT_INIT(&conn->out_proto_thread);
1710 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1711 unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1712 PT_MQTT_WAIT_SEND();
1716 if(ev == mqtt_do_publish_event) {
1718 DBG(
"MQTT - Got mqtt_do_publish_mqtt_event!\n");
1720 if(conn->out_buffer_sent == 1 &&
1721 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1722 PT_INIT(&conn->out_proto_thread);
1723 while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1724 publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1725 PT_MQTT_WAIT_SEND();
1730 if(ev == mqtt_do_auth_event) {
1732 DBG(
"MQTT - Got mqtt_do_auth_event!\n");
1734 if(conn->out_buffer_sent == 1) {
1735 PT_INIT(&conn->out_proto_thread);
1736 while(auth_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1737 PT_MQTT_WAIT_SEND();
1742 conn->out_props = NULL;
1751 static uint8_t inited = 0;
1754 mqtt_event_min = mqtt_do_connect_tcp_event;
1764 mqtt_event_max = mqtt_abort_now_event;
1779 uint16_t max_segment_size)
1781 #if MQTT_31 || !MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID 1782 if(strlen(client_id) < 1) {
1783 return MQTT_STATUS_INVALID_ARGS_ERROR;
1788 memset(conn, 0,
sizeof(
struct mqtt_connection));
1791 conn->srv_feature_en = -1;
1793 string_to_mqtt_string(&conn->client_id, client_id);
1794 conn->event_callback = event_callback;
1795 conn->app_process = app_process;
1796 conn->auto_reconnect = 1;
1797 conn->max_segment_size = max_segment_size;
1799 reset_defaults(conn);
1805 DBG(
"MQTT - Registered successfully\n");
1807 return MQTT_STATUS_OK;
1817 uint16_t keep_alive,
1819 uint8_t clean_session,
1820 struct mqtt_prop_list *prop_list)
1822 uint8_t clean_session)
1825 uip_ip6addr_t ip6addr;
1830 if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1831 return MQTT_STATUS_OK;
1834 conn->server_host = host;
1835 conn->keep_alive = keep_alive;
1836 conn->server_port = port;
1837 conn->out_buffer_ptr = conn->out_buffer;
1838 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1841 if(clean_session || (conn->client_id.length == 0)) {
1842 conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1846 if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1847 return MQTT_STATUS_ERROR;
1858 conn->out_props = prop_list;
1861 process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1863 return MQTT_STATUS_OK;
1869 struct mqtt_prop_list *prop_list)
1874 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1878 conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1881 conn->out_props = prop_list;
1884 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1890 mqtt_qos_level_t qos_level,
1891 mqtt_nl_en_t nl, mqtt_rap_en_t rap,
1892 mqtt_retain_handling_t ret_handling,
1893 struct mqtt_prop_list *prop_list)
1895 mqtt_qos_level_t qos_level)
1898 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1899 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1902 DBG(
"MQTT - Call to mqtt_subscribe...\n");
1905 if(conn->out_queue_full) {
1906 DBG(
"MQTT - Not accepted!\n");
1907 return MQTT_STATUS_OUT_QUEUE_FULL;
1909 conn->out_queue_full = 1;
1910 DBG(
"MQTT - Accepted!\n");
1912 conn->out_packet.mid = INCREMENT_MID(conn);
1913 conn->out_packet.topic = topic;
1914 conn->out_packet.topic_length = strlen(topic);
1915 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1918 *mid = conn->out_packet.mid;
1922 conn->out_packet.sub_options = 0x00;
1923 conn->out_packet.sub_options |= qos_level & MQTT_SUB_OPTION_QOS;
1924 conn->out_packet.sub_options |= nl & MQTT_SUB_OPTION_NL;
1925 conn->out_packet.sub_options |= rap & MQTT_SUB_OPTION_RAP;
1926 conn->out_packet.sub_options |= ret_handling & MQTT_SUB_OPTION_RETAIN_HANDLING;
1928 conn->out_packet.qos = qos_level;
1932 conn->out_props = prop_list;
1935 process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1936 return MQTT_STATUS_OK;
1943 struct mqtt_prop_list *prop_list)
1948 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1949 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1952 DBG(
"MQTT - Call to mqtt_unsubscribe...\n");
1954 if(conn->out_queue_full) {
1955 DBG(
"MQTT - Not accepted!\n");
1956 return MQTT_STATUS_OUT_QUEUE_FULL;
1958 conn->out_queue_full = 1;
1959 DBG(
"MQTT - Accepted!\n");
1961 conn->out_packet.mid = INCREMENT_MID(conn);
1962 conn->out_packet.topic = topic;
1963 conn->out_packet.topic_length = strlen(topic);
1964 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1967 *mid = conn->out_packet.mid;
1971 conn->out_props = prop_list;
1974 process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1975 return MQTT_STATUS_OK;
1980 uint8_t *payload, uint32_t payload_size,
1981 mqtt_qos_level_t qos_level,
1983 mqtt_retain_t retain,
1984 uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en,
1985 struct mqtt_prop_list *prop_list)
1987 mqtt_retain_t retain)
1990 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1991 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1994 DBG(
"MQTT - Call to mqtt_publish...\n");
1997 if(conn->out_queue_full) {
1998 DBG(
"MQTT - Not accepted!\n");
1999 return MQTT_STATUS_OUT_QUEUE_FULL;
2001 conn->out_queue_full = 1;
2002 DBG(
"MQTT - Accepted!\n");
2004 conn->out_packet.mid = INCREMENT_MID(conn);
2005 conn->out_packet.retain = retain;
2007 if(topic_alias_en == MQTT_TOPIC_ALIAS_ON) {
2008 conn->out_packet.topic =
"";
2009 conn->out_packet.topic_length = 0;
2010 conn->out_packet.topic_alias = topic_alias;
2011 if(topic_alias == 0) {
2012 DBG(
"MQTT - Error, a topic alias of 0 is not permitted! It won't be sent.\n");
2015 conn->out_packet.topic = topic;
2016 conn->out_packet.topic_length = strlen(topic);
2017 conn->out_packet.topic_alias = 0;
2020 conn->out_packet.topic = topic;
2021 conn->out_packet.topic_length = strlen(topic);
2023 conn->out_packet.payload = payload;
2024 conn->out_packet.payload_size = payload_size;
2025 conn->out_packet.qos = qos_level;
2026 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
2029 *mid = conn->out_packet.mid;
2033 conn->out_props = prop_list;
2036 process_post(&mqtt_process, mqtt_do_publish_event, conn);
2037 return MQTT_STATUS_OK;
2045 string_to_mqtt_string(&conn->credentials.username, username);
2046 string_to_mqtt_string(&conn->credentials.password, password);
2049 if(username != NULL) {
2050 conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
2052 conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
2054 if(password != NULL) {
2055 conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
2057 conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
2064 mqtt_qos_level_t qos,
struct mqtt_prop_list *will_props)
2066 mqtt_qos_level_t qos)
2070 string_to_mqtt_string(&conn->will.topic, topic);
2071 string_to_mqtt_string(&conn->will.message, message);
2074 conn->will.qos = qos;
2077 conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
2078 MQTT_VHDR_WILL_RETAIN_FLAG;
2081 conn->will.properties = (
list_t)will_props;
2096 mqtt_auth_type_t auth_type,
2097 struct mqtt_prop_list *prop_list)
2099 DBG(
"MQTT - Call to mqtt_auth...\n");
2101 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_AUTH;
2102 conn->out_packet.remaining_length = 1;
2103 conn->out_packet.auth_reason_code = MQTT_VHDR_RC_CONTINUE_AUTH + auth_type;
2105 conn->out_props = prop_list;
2108 return MQTT_STATUS_OK;
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
mqtt_event_t
MQTT engine events.
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, struct mqtt_prop_list *prop_list)
Unsubscribes from a MQTT topic.
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
#define PROCESS(name, strname)
Declare a process.
Protothreads implementation.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
void mqtt_disconnect(struct mqtt_connection *conn, struct mqtt_prop_list *prop_list)
Disconnects from a MQTT broker.
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
#define PROCESS_BEGIN()
Define the beginning of a process.
#define PROCESS_END()
Define the end of a process.
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level, mqtt_nl_en_t nl, mqtt_rap_en_t rap, mqtt_retain_handling_t ret_handling, struct mqtt_prop_list *prop_list)
Subscribes to a MQTT topic.
void ** list_t
The linked list type.
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive, uint8_t clean_session, struct mqtt_prop_list *prop_list)
Connects to a MQTT broker.
Header file for IPv6-related data structures.
#define PT_INIT(pt)
Initialize a protothread.
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain, uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en, struct mqtt_prop_list *prop_list)
Publish to a MQTT topic.
#define CLOCK_SECOND
A second, measured in system clock time.
Header file for the callback timer
#define PT_END(pt)
Declare the end of a protothread.
Linked list manipulation routines.
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
void * list_head(list_t list)
Get a pointer to the first element of a list.
mqtt_status_t mqtt_auth(struct mqtt_connection *conn, mqtt_auth_type_t auth_type, struct mqtt_prop_list *prop_list)
Send authentication message (MQTTv5-only).
Header file for the Contiki MQTT engine.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
int timer_expired(struct timer *t)
Check if a timer has expired.
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos, struct mqtt_prop_list *will_props)
Set the last will topic and message for a MQTT client.
#define PT_EXIT(pt)
Exit the protothread.
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
#define PT_THREAD(name_args)
Declaration of a protothread.
process_event_t process_alloc_event(void)
Allocate a global event number.
void list_add(list_t list, void *item)
Add an item at the end of a list.
void list_init(list_t list)
Initialize a list.
Header file for the uIP TCP/IP stack.
#define LIST(name)
Declare a linked list.
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
Default definitions of C compiler quirk work-arounds.
PROCESS_THREAD(cc2538_rf_process, ev, data)
Implementation of the cc2538 RF driver process.
Header file for the LED HAL.
void * list_item_next(void *item)
Get the next item following this item.
void process_start(struct process *p, process_data_t data)
Start a process.