Contiki-NG
mqtt.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the copyright holder nor the names of its
14  * contributors may be used to endorse or promote products derived
15  * from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
20  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
21  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
28  * OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 /*---------------------------------------------------------------------------*/
31 /**
32  * \addtogroup mqtt-engine
33  * @{
34  */
35 /**
36  * \file
37  * Implementation of the Contiki MQTT engine
38  *
39  * \author
40  * Texas Instruments
41  */
42 /*---------------------------------------------------------------------------*/
43 #include "mqtt.h"
44 #include "mqtt-prop.h"
45 #include "contiki.h"
46 #include "contiki-net.h"
47 #include "contiki-lib.h"
48 #include "lib/random.h"
49 #include "sys/ctimer.h"
50 #include "sys/etimer.h"
51 #include "sys/pt.h"
52 #include "net/ipv6/uip.h"
53 #include "net/ipv6/uip-ds6.h"
54 #include "dev/leds.h"
55 
56 #include "tcp-socket.h"
57 
58 #include "lib/assert.h"
59 #include "lib/list.h"
60 #include "sys/cc.h"
61 
62 #include <stdlib.h>
63 #include <stdio.h>
64 #include <string.h>
65 #include <stdarg.h>
66 /*---------------------------------------------------------------------------*/
67 #define DEBUG 0
68 #if DEBUG
69 #define PRINTF(...) printf(__VA_ARGS__)
70 #else
71 #define PRINTF(...)
72 #endif
73 /*---------------------------------------------------------------------------*/
74 typedef enum {
75  MQTT_FHDR_DUP_FLAG = 0x08,
76 
77  MQTT_FHDR_QOS_LEVEL_0 = 0x00,
78  MQTT_FHDR_QOS_LEVEL_1 = 0x02,
79  MQTT_FHDR_QOS_LEVEL_2 = 0x04,
80 
81  MQTT_FHDR_RETAIN_FLAG = 0x01,
82 } mqtt_fhdr_fields_t;
83 /*---------------------------------------------------------------------------*/
84 typedef enum {
85  MQTT_VHDR_USERNAME_FLAG = 0x80,
86  MQTT_VHDR_PASSWORD_FLAG = 0x40,
87 
88  MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
89  MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
90  MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
91  MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
92 
93  MQTT_VHDR_WILL_FLAG = 0x04,
94  MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02, /* called Clean Start in MQTTv5.0 */
95 } mqtt_vhdr_conn_fields_t;
96 /*---------------------------------------------------------------------------*/
97 typedef enum {
98  MQTT_VHDR_CONN_ACCEPTED,
99  MQTT_VHDR_CONN_REJECTED_PROTOCOL,
100  MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
101  MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
102  MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
103  MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
104 } mqtt_vhdr_connack_ret_code_t;
105 
106 typedef enum {
107  MQTT_VHDR_CONNACK_SESSION_PRESENT = 0x1
108 } mqtt_vhdr_connack_flags_t;
109 
110 /*---------------------------------------------------------------------------*/
111 typedef enum {
112  MQTT_SUBACK_RET_QOS_0 = 0x00,
113  MQTT_SUBACK_RET_QOS_1 = 0x01,
114  MQTT_SUBACK_RET_QOS_2 = 0x02,
115  MQTT_SUBACK_RET_FAIL = 0x08,
116 } mqtt_suback_ret_code_t;
117 /*---------------------------------------------------------------------------*/
118 /* MQTTv5.0 Reason Codes */
119 typedef enum {
120  MQTT_VHDR_RC_SUCCES_OR_NORMAL = 0x00,
121  MQTT_VHDR_RC_QOS_0 = 0x01,
122  MQTT_VHDR_RC_QOS_1 = 0x02,
123  MQTT_VHDR_RC_QOS_2 = 0x03,
124  MQTT_VHDR_RC_DISC_WITH_WILL = 0x04,
125  MQTT_VHDR_RC_NO_MATCH_SUB = 0x10,
126  MQTT_VHDR_RC_NO_SUB_EXISTED = 0x11,
127  MQTT_VHDR_RC_CONTINUE_AUTH = 0x18,
128  MQTT_VHDR_RC_REAUTH = 0x19,
129  MQTT_VHDR_RC_UNSPEC_ERR = 0x80,
130  MQTT_VHDR_RC_MALFORMED_PKT = 0x81,
131  MQTT_VHDR_RC_PROTOCOL_ERR = 0x82,
132  MQTT_VHDR_RC_IMPL_SPEC_ERR = 0x83,
133  MQTT_VHDR_RC_PROT_VER_UNUSUPPORTED = 0x84,
134  MQTT_VHDR_RC_CLIENT_ID_INVALID = 0x85,
135  MQTT_VHDR_RC_BAD_USER_PASS = 0x86,
136  MQTT_VHDR_RC_NOT_AUTH = 0x87,
137  MQTT_VHDR_RC_SRV_UNAVAIL = 0x88,
138  MQTT_VHDR_RC_SRV_BUSY = 0x89,
139  MQTT_VHDR_RC_BANNED = 0x8A,
140  MQTT_VHDR_RC_SRV_SHUTDOWN = 0x8B,
141  MQTT_VHDR_RC_BAD_AUTH_METHOD = 0x8C,
142  MQTT_VHDR_RC_KEEP_ALIVE_TIMEOUT = 0x8D,
143  MQTT_VHDR_RC_SESS_TAKEN_OVER = 0x8E,
144  MQTT_VHDR_RC_TOPIC_FILT_INVAL = 0x8F,
145  MQTT_VHDR_RC_TOPIC_NAME_INVAL = 0x90,
146  MQTT_VHDR_RC_PKT_ID_IN_USE = 0x91,
147  MQTT_VHDR_RC_PKT_ID_NOT_FOUND = 0x92,
148  MQTT_VHDR_RC_RECV_MAX_EXCEEDED = 0x93,
149  MQTT_VHDR_RC_TOPIC_ALIAS_INVAL = 0x94,
150  MQTT_VHDR_RC_PKT_TOO_LARGE = 0x95,
151  MQTT_VHDR_RC_MSG_RATE_TOO_HIGH = 0x96,
152  MQTT_VHDR_RC_QUOTA_EXCEEDED = 0x97,
153  MQTT_VHDR_RC_ADMIN_ACTION = 0x98,
154  MQTT_VHDR_RC_PAYLD_FMT_INVAL = 0x99,
155  MQTT_VHDR_RC_RETAIN_UNSUPPORTED = 0x9A,
156  MQTT_VHDR_RC_QOS_UNSUPPORTED = 0x9B,
157  MQTT_VHDR_RC_USE_ANOTHER_SRV = 0x9C,
158  MQTT_VHDR_RC_SRV_MOVED = 0x9D,
159  MQTT_VHDR_RC_SHARED_SUB_UNSUPPORTED = 0x9E,
160  MQTT_VHDR_RC_CONN_RATE_EXCEEDED = 0x9F,
161  MQTT_VHDR_RC_MAX_CONN_TIME = 0xA0,
162  MQTT_VHDR_RC_SUB_ID_UNSUPPORTED = 0xA1,
163  MQTT_VHDR_RC_WILD_SUB_UNSUPPORTED = 0xA2,
164 } mqtt_reason_code_t;
165 /*---------------------------------------------------------------------------*/
166 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10)
167 /*---------------------------------------------------------------------------*/
168 #define INCREMENT_MID(conn) (conn)->mid_counter += 2
169 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length))
170 /*---------------------------------------------------------------------------*/
171 /* Protothread send macros */
172 #define PT_MQTT_WRITE_BYTES(conn, data, len) \
173  conn->out_write_pos = 0; \
174  while(write_bytes(conn, data, len)) { \
175  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
176  }
177 
178 #define PT_MQTT_WRITE_BYTE(conn, data) \
179  while(write_byte(conn, data)) { \
180  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
181  }
182 /*---------------------------------------------------------------------------*/
183 /*
184  * Sends the continue send event and wait for that event.
185  *
186  * The reason we cannot use PROCESS_PAUSE() is since we would risk loosing any
187  * events posted during the sending process.
188  */
189 #define PT_MQTT_WAIT_SEND() \
190  do { \
191  if (PROCESS_ERR_OK == \
192  process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \
193  do { \
194  PROCESS_WAIT_EVENT(); \
195  if(ev == mqtt_abort_now_event) { \
196  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
197  PT_INIT(&conn->out_proto_thread); \
198  process_post(PROCESS_CURRENT(), ev, data); \
199  } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
200  process_post(PROCESS_CURRENT(), ev, data); \
201  } \
202  } while (ev != mqtt_continue_send_event); \
203  } \
204  } while(0)
205 /*---------------------------------------------------------------------------*/
206 static process_event_t mqtt_do_connect_tcp_event;
207 static process_event_t mqtt_do_connect_mqtt_event;
208 static process_event_t mqtt_do_disconnect_mqtt_event;
209 static process_event_t mqtt_do_subscribe_event;
210 static process_event_t mqtt_do_unsubscribe_event;
211 static process_event_t mqtt_do_publish_event;
212 static process_event_t mqtt_do_pingreq_event;
213 static process_event_t mqtt_continue_send_event;
214 static process_event_t mqtt_abort_now_event;
215 static process_event_t mqtt_do_auth_event;
216 process_event_t mqtt_update_event;
217 
218 /*
219  * Min and Max event numbers we want to acknowledge while we're in the process
220  * of doing something else. continue_send does not count, therefore must be
221  * allocated last
222  */
223 static process_event_t mqtt_event_min;
224 static process_event_t mqtt_event_max;
225 /*---------------------------------------------------------------------------*/
226 /* Prototypes */
227 static int
228 tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr,
229  int input_data_len);
230 
231 static void tcp_event(struct tcp_socket *s, void *ptr,
232  tcp_socket_event_t event);
233 
234 static void reset_packet(struct mqtt_in_packet *packet);
235 /*---------------------------------------------------------------------------*/
236 LIST(mqtt_conn_list);
237 /*---------------------------------------------------------------------------*/
238 PROCESS(mqtt_process, "MQTT process");
239 /*---------------------------------------------------------------------------*/
240 static void
241 call_event(struct mqtt_connection *conn,
242  mqtt_event_t event,
243  void *data)
244 {
245  conn->event_callback(conn, event, data);
246  process_post(conn->app_process, mqtt_update_event, NULL);
247 }
248 /*---------------------------------------------------------------------------*/
249 static void
250 reset_defaults(struct mqtt_connection *conn)
251 {
252  conn->mid_counter = 1;
253  PT_INIT(&conn->out_proto_thread);
254  conn->waiting_for_pingresp = 0;
255 
256  reset_packet(&conn->in_packet);
257  conn->out_buffer_sent = 0;
258 }
259 /*---------------------------------------------------------------------------*/
260 static void
261 abort_connection(struct mqtt_connection *conn)
262 {
263  conn->out_buffer_ptr = conn->out_buffer;
264  conn->out_queue_full = 0;
265 
266  /* Reset outgoing packet */
267  memset(&conn->out_packet, 0, sizeof(conn->out_packet));
268 
269  tcp_socket_close(&conn->socket);
270  tcp_socket_unregister(&conn->socket);
271 
272  memset(&conn->socket, 0, sizeof(conn->socket));
273 
274  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
275 }
276 /*---------------------------------------------------------------------------*/
277 static void
278 connect_tcp(struct mqtt_connection *conn)
279 {
280  conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
281 
282  reset_defaults(conn);
283  tcp_socket_register(&(conn->socket),
284  conn,
285  conn->in_buffer,
286  MQTT_TCP_INPUT_BUFF_SIZE,
287  conn->out_buffer,
288  MQTT_TCP_OUTPUT_BUFF_SIZE,
289  tcp_input,
290  tcp_event);
291  tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
292 }
293 /*---------------------------------------------------------------------------*/
294 static void
295 disconnect_tcp(struct mqtt_connection *conn)
296 {
297  conn->state = MQTT_CONN_STATE_DISCONNECTING;
298  tcp_socket_close(&(conn->socket));
299  tcp_socket_unregister(&conn->socket);
300 
301  memset(&conn->socket, 0, sizeof(conn->socket));
302 }
303 /*---------------------------------------------------------------------------*/
304 static void
305 send_out_buffer(struct mqtt_connection *conn)
306 {
307  if(conn->out_buffer_ptr - conn->out_buffer == 0) {
308  conn->out_buffer_sent = 1;
309  return;
310  }
311  conn->out_buffer_sent = 0;
312 
313  DBG("MQTT - (send_out_buffer) Space used in buffer: %i\n",
314  conn->out_buffer_ptr - conn->out_buffer);
315 
316  tcp_socket_send(&conn->socket, conn->out_buffer,
317  conn->out_buffer_ptr - conn->out_buffer);
318 }
319 /*---------------------------------------------------------------------------*/
320 static void
321 string_to_mqtt_string(struct mqtt_string *mqtt_string, char *string)
322 {
323  if(mqtt_string == NULL) {
324  return;
325  }
326  mqtt_string->string = string;
327 
328  if(string != NULL) {
329  mqtt_string->length = strlen(string);
330  } else {
331  mqtt_string->length = 0;
332  }
333 }
334 /*---------------------------------------------------------------------------*/
335 static int
336 write_byte(struct mqtt_connection *conn, uint8_t data)
337 {
338  DBG("MQTT - (write_byte) buff_size: %i write: '%02X'\n",
339  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
340  data);
341 
342  if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
343  send_out_buffer(conn);
344  return 1;
345  }
346 
347  *conn->out_buffer_ptr = data;
348  conn->out_buffer_ptr++;
349  return 0;
350 }
351 /*---------------------------------------------------------------------------*/
352 static int
353 write_bytes(struct mqtt_connection *conn, uint8_t *data, uint16_t len)
354 {
355  uint16_t write_bytes;
356  write_bytes =
357  MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
358  len - conn->out_write_pos);
359 
360  memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
361  conn->out_write_pos += write_bytes;
362  conn->out_buffer_ptr += write_bytes;
363 
364  DBG("MQTT - (write_bytes) len: %u write_pos: %i\n", len,
365  conn->out_write_pos);
366 
367  if(len - conn->out_write_pos == 0) {
368  conn->out_write_pos = 0;
369  return 0;
370  } else {
371  send_out_buffer(conn);
372  return len - conn->out_write_pos;
373  }
374 }
375 /*---------------------------------------------------------------------------*/
376 uint8_t
377 mqtt_decode_var_byte_int(const uint8_t *input_data_ptr,
378  int input_data_len,
379  uint32_t *input_pos,
380  uint32_t *pkt_byte_count,
381  uint16_t *dest)
382 {
383  uint8_t read_bytes = 0;
384  uint8_t byte_in;
385  uint8_t multiplier = 1;
386  uint32_t input_pos_0 = 0;
387 
388  if(input_pos == NULL) {
389  input_pos = &input_pos_0;
390  }
391 
392  do {
393  if(*input_pos >= input_data_len) {
394  return 0;
395  }
396 
397  byte_in = input_data_ptr[*input_pos];
398  (*input_pos)++;
399  if(pkt_byte_count) {
400  (*pkt_byte_count)++;
401  }
402  read_bytes++;
403  DBG("MQTT - Read Variable Byte Integer byte %i\n", byte_in);
404 
405  if(read_bytes > 4) {
406  DBG("Received more than 4 byte 'Variable Byte Integer'.");
407  return 0;
408  }
409 
410  *dest += (byte_in & 127) * multiplier;
411  multiplier *= 128;
412  } while((byte_in & 128) != 0);
413 
414  return read_bytes;
415 }
416 /*---------------------------------------------------------------------------*/
417 void
418 mqtt_encode_var_byte_int(uint8_t *vbi_out,
419  uint8_t *vbi_bytes,
420  uint32_t val)
421 {
422  uint8_t digit;
423 
424  DBG("MQTT - Encoding Variable Byte Integer %u\n", val);
425 
426  *vbi_bytes = 0;
427  do {
428  digit = val % 128;
429  val = val / 128;
430  if(val > 0) {
431  digit = digit | 0x80;
432  }
433 
434  vbi_out[*vbi_bytes] = digit;
435  (*vbi_bytes)++;
436  DBG("MQTT - Encode VBI digit '%u' length '%i'\n", digit, val);
437  } while(val > 0 && *vbi_bytes < 5);
438  DBG("MQTT - var_byte_int bytes %u\n", *vbi_bytes);
439 }
440 /*---------------------------------------------------------------------------*/
441 static void
442 keep_alive_callback(void *ptr)
443 {
444  struct mqtt_connection *conn = ptr;
445 
446  DBG("MQTT - (keep_alive_callback) Called!\n");
447 
448  /* The flag is set when the PINGREQ has been sent */
449  if(conn->waiting_for_pingresp) {
450  PRINTF("MQTT - Disconnect due to no PINGRESP from broker.\n");
451  disconnect_tcp(conn);
452  return;
453  }
454 
455  process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
456 }
457 /*---------------------------------------------------------------------------*/
458 static void
459 reset_packet(struct mqtt_in_packet *packet)
460 {
461  memset(packet, 0, sizeof(struct mqtt_in_packet));
462 }
463 /*---------------------------------------------------------------------------*/
464 #if MQTT_5
465 static
466 PT_THREAD(write_out_props(struct pt *pt, struct mqtt_connection *conn,
467  struct mqtt_prop_list *prop_list))
468 {
469  PT_BEGIN(pt);
470 
471  static struct mqtt_prop_out_property *prop;
472 
473  if(prop_list) {
474  DBG("MQTT - Writing %i property bytes\n", prop_list->properties_len + prop_list->properties_len_enc_bytes);
475  /* Write total length of properties */
476  PT_MQTT_WRITE_BYTES(conn,
477  prop_list->properties_len_enc,
478  prop_list->properties_len_enc_bytes);
479 
480  prop = (struct mqtt_prop_out_property *)list_head(prop_list->props);
481  do {
482  if(prop != NULL) {
483  DBG("MQTT - Property ID %i len %i\n", prop->id, prop->property_len);
484  PT_MQTT_WRITE_BYTE(conn, prop->id);
485  PT_MQTT_WRITE_BYTES(conn,
486  prop->val,
487  prop->property_len);
488  }
489  prop = (struct mqtt_prop_out_property *)list_item_next(prop);
490  } while(prop != NULL);
491  } else {
492  /* Write Property Length */
493  DBG("MQTT - No properties to write\n");
494  PT_MQTT_WRITE_BYTE(conn, 0);
495  }
496 
497  PT_END(pt);
498 }
499 #endif
500 /*---------------------------------------------------------------------------*/
501 static
502 PT_THREAD(connect_pt(struct pt *pt, struct mqtt_connection *conn))
503 {
504  PT_BEGIN(pt);
505 
506 #if MQTT_5
507  static struct mqtt_prop_list *will_props = MQTT_PROP_LIST_NONE;
508  if(conn->will.properties) {
509  will_props = (struct mqtt_prop_list *)list_head(conn->will.properties);
510  }
511 #endif
512 
513  DBG("MQTT - Sending CONNECT message...\n");
514 
515  /* Set up FHDR */
516  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
517  conn->out_packet.remaining_length = 0;
518  conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_SIZE;
519  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
520 #if (MQTT_PROTOCOL_VERSION > MQTT_PROTOCOL_VERSION_3_1) && MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID
521  /* Ensure we leave space for the 2 length bytes (which will encode 0) */
522  if(MQTT_STRING_LENGTH(&conn->client_id) == 0) {
523  conn->out_packet.remaining_length += 2;
524  }
525 #endif
526  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
527  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
528  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
529  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
530 
531 #if MQTT_5
532  /* For connect properties */
533  conn->out_packet.remaining_length +=
534  conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
535  : 1;
536 
537  /* For will properties */
538  if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
539  conn->out_packet.remaining_length +=
540  will_props ? will_props->properties_len + will_props->properties_len_enc_bytes
541  : 1;
542  }
543 #endif
544 
545  mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
546  &conn->out_packet.remaining_length_enc_bytes,
547  conn->out_packet.remaining_length);
548  if(conn->out_packet.remaining_length_enc_bytes > 4) {
549  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
550  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
551  PT_EXIT(pt);
552  }
553 
554  /* Write Fixed Header */
555  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
556  PT_MQTT_WRITE_BYTES(conn,
557  conn->out_packet.remaining_length_enc,
558  conn->out_packet.remaining_length_enc_bytes);
559  PT_MQTT_WRITE_BYTE(conn, 0);
560  PT_MQTT_WRITE_BYTE(conn, strlen(MQTT_PROTOCOL_NAME));
561  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, strlen(MQTT_PROTOCOL_NAME));
562  PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
563  PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
564  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
565  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
566 
567 #if MQTT_5
568  /* Write Properties */
569  write_out_props(pt, conn, conn->out_props);
570 #endif
571 
572  /* Write Payload */
573  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length >> 8);
574  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
575  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
576  conn->client_id.length);
577 
578  if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
579 #if MQTT_5
580  /* Write Will Properties */
581  DBG("MQTT - Writing will properties\n");
582  write_out_props(pt, conn, will_props);
583 #endif
584  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length >> 8);
585  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
586  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
587  conn->will.topic.length);
588  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length >> 8);
589  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
590  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
591  conn->will.message.length);
592  DBG("MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
593  conn->will.topic.string,
594  conn->will.topic.length,
595  conn->will.message.string,
596  conn->will.message.length);
597  }
598  if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
599  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length >> 8);
600  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
601  PT_MQTT_WRITE_BYTES(conn,
602  (uint8_t *)conn->credentials.username.string,
603  conn->credentials.username.length);
604  }
605  if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
606  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length >> 8);
607  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
608  PT_MQTT_WRITE_BYTES(conn,
609  (uint8_t *)conn->credentials.password.string,
610  conn->credentials.password.length);
611  }
612 
613  /* Send out buffer */
614  send_out_buffer(conn);
615  conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
616 
617  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
618 
619  /* Wait for CONNACK */
620  reset_packet(&conn->in_packet);
621  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
622  timer_expired(&conn->t));
623  if(timer_expired(&conn->t)) {
624  DBG("Timeout waiting for CONNACK\n");
625  /* We stick to the letter of the spec here: Tear the connection down */
626 #if MQTT_5
627  mqtt_disconnect(conn, MQTT_PROP_LIST_NONE);
628 #else
629  mqtt_disconnect(conn);
630 #endif
631  }
632  reset_packet(&conn->in_packet);
633 
634  DBG("MQTT - Done sending CONNECT\n");
635 
636 #if DEBUG_MQTT == 1
637  DBG("MQTT - CONNECT message sent: \n");
638  uint16_t i;
639  for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
640  DBG("%02X ", conn->out_buffer[i]);
641  }
642  DBG("\n");
643 #endif
644 
645  PT_END(pt);
646 }
647 /*---------------------------------------------------------------------------*/
648 static
649 PT_THREAD(disconnect_pt(struct pt *pt, struct mqtt_connection *conn))
650 {
651  PT_BEGIN(pt);
652 
653  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
654  PT_MQTT_WRITE_BYTE(conn, 0);
655 
656 #if MQTT_5
657 /* Write Properties */
658  write_out_props(pt, conn, conn->out_props);
659 #endif
660 
661  send_out_buffer(conn);
662 
663  /*
664  * Wait a couple of seconds for a TCP ACK. We don't really need the ACK,
665  * we do want the TCP/IP stack to actually send this disconnect before we
666  * tear down the session.
667  */
668  timer_set(&conn->t, (CLOCK_SECOND * 2));
669  PT_WAIT_UNTIL(pt, conn->out_buffer_sent || timer_expired(&conn->t));
670 
671  PT_END(pt);
672 }
673 /*---------------------------------------------------------------------------*/
674 static
675 PT_THREAD(subscribe_pt(struct pt *pt, struct mqtt_connection *conn))
676 {
677  PT_BEGIN(pt);
678 
679  DBG("MQTT - Sending subscribe message! topic %s topic_length %i\n",
680  conn->out_packet.topic,
681  conn->out_packet.topic_length);
682  DBG("MQTT - Buffer space is %i \n",
683  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
684 
685  /* Set up FHDR */
686  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
687  conn->out_packet.remaining_length = MQTT_MID_SIZE +
688  MQTT_STRING_LEN_SIZE +
689  conn->out_packet.topic_length +
690  MQTT_QOS_SIZE;
691 
692 #if MQTT_5
693  conn->out_packet.remaining_length +=
694  conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
695  : 1;
696 #endif
697 
698  mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
699  &conn->out_packet.remaining_length_enc_bytes,
700  conn->out_packet.remaining_length);
701  if(conn->out_packet.remaining_length_enc_bytes > 4) {
702  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
703  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
704  PT_EXIT(pt);
705  }
706 
707  /* Write Fixed Header */
708  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
709  PT_MQTT_WRITE_BYTES(conn,
710  conn->out_packet.remaining_length_enc,
711  conn->out_packet.remaining_length_enc_bytes);
712  /* Write Variable Header */
713  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
714  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
715 
716 #if MQTT_5
717  /* Write Properties */
718  write_out_props(pt, conn, conn->out_props);
719 #endif
720 
721  /* Write Payload */
722  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
723  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
724  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
725  conn->out_packet.topic_length);
726 
727 #if MQTT_5
728  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.sub_options);
729 #else
730  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
731 #endif
732 
733  /* Send out buffer */
734  send_out_buffer(conn);
735  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
736 
737  /* Wait for SUBACK. */
738  reset_packet(&conn->in_packet);
739  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
740  timer_expired(&conn->t));
741 
742  if(timer_expired(&conn->t)) {
743  DBG("Timeout waiting for SUBACK\n");
744  }
745  reset_packet(&conn->in_packet);
746 
747  /* This is clear after the entire transaction is complete */
748  conn->out_queue_full = 0;
749 
750  DBG("MQTT - Done in send_subscribe!\n");
751 
752  PT_END(pt);
753 }
754 /*---------------------------------------------------------------------------*/
755 static
756 PT_THREAD(unsubscribe_pt(struct pt *pt, struct mqtt_connection *conn))
757 {
758  PT_BEGIN(pt);
759 
760  DBG("MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
761  conn->out_packet.topic,
762  conn->out_packet.topic_length);
763  DBG("MQTT - Buffer space is %i \n",
764  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
765 
766  /* Set up FHDR */
767  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
768  MQTT_FHDR_QOS_LEVEL_1;
769  conn->out_packet.remaining_length = MQTT_MID_SIZE +
770  MQTT_STRING_LEN_SIZE +
771  conn->out_packet.topic_length;
772 
773 #if MQTT_5
774  conn->out_packet.remaining_length +=
775  conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
776  : 1;
777 #endif
778 
779  mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
780  &conn->out_packet.remaining_length_enc_bytes,
781  conn->out_packet.remaining_length);
782  if(conn->out_packet.remaining_length_enc_bytes > 4) {
783  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
784  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
785  PT_EXIT(pt);
786  }
787 
788  /* Write Fixed Header */
789  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
790  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
791  conn->out_packet.remaining_length_enc_bytes);
792 
793  /* Write Variable Header */
794  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
795  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
796 #if MQTT_5
797  /* Write Properties */
798  write_out_props(pt, conn, conn->out_props);
799 #endif
800 
801  /* Write Payload */
802  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
803  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
804  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
805  conn->out_packet.topic_length);
806 
807  /* Send out buffer */
808  send_out_buffer(conn);
809  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
810 
811  /* Wait for UNSUBACK */
812  reset_packet(&conn->in_packet);
813  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
814  timer_expired(&conn->t));
815 
816  if(timer_expired(&conn->t)) {
817  DBG("Timeout waiting for UNSUBACK\n");
818  }
819 
820  reset_packet(&conn->in_packet);
821 
822  /* This is clear after the entire transaction is complete */
823  conn->out_queue_full = 0;
824 
825  DBG("MQTT - Done writing subscribe message to out buffer!\n");
826 
827  PT_END(pt);
828 }
829 /*---------------------------------------------------------------------------*/
830 static
831 PT_THREAD(publish_pt(struct pt *pt, struct mqtt_connection *conn))
832 {
833  PT_BEGIN(pt);
834 
835  DBG("MQTT - Sending publish message! topic %s topic_length %i\n",
836  conn->out_packet.topic,
837  conn->out_packet.topic_length);
838  DBG("MQTT - Buffer space is %i \n",
839  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
840 
841  /* Set up FHDR */
842  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
843  conn->out_packet.qos << 1;
844  if(conn->out_packet.retain == MQTT_RETAIN_ON) {
845  conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
846  }
847  conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
848  conn->out_packet.topic_length +
849  conn->out_packet.payload_size;
850  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
851  conn->out_packet.remaining_length += MQTT_MID_SIZE;
852  }
853 
854 #if MQTT_5
855  conn->out_packet.remaining_length +=
856  conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
857  : 1;
858 #endif
859 
860  mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
861  &conn->out_packet.remaining_length_enc_bytes,
862  conn->out_packet.remaining_length);
863  if(conn->out_packet.remaining_length_enc_bytes > 4) {
864  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
865  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
866  PT_EXIT(pt);
867  }
868 
869  /* The DUP flag MUST be set to 0 for all QoS 0 messages */
870  if(conn->out_packet.qos == MQTT_QOS_LEVEL_0) {
871  conn->out_packet.fhdr &= ~MQTT_FHDR_DUP_FLAG;
872  }
873 
874  /* Write Fixed Header */
875  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
876  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
877  conn->out_packet.remaining_length_enc_bytes);
878  /* Write Variable Header */
879  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
880  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
881  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
882  conn->out_packet.topic_length);
883  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
884  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8));
885  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
886  }
887 
888 #if MQTT_5
889  /* Write Properties */
890  write_out_props(pt, conn, conn->out_props);
891 #endif
892 
893  /* Write Payload */
894  PT_MQTT_WRITE_BYTES(conn,
895  conn->out_packet.payload,
896  conn->out_packet.payload_size);
897 
898  send_out_buffer(conn);
899  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
900 
901  /*
902  * If QoS is zero then wait until the message has been sent, since there is
903  * no ACK to wait for.
904  *
905  * Also notify the app will not be notified via PUBACK or PUBCOMP
906  */
907  if(conn->out_packet.qos == 0) {
908  process_post(conn->app_process, mqtt_update_event, NULL);
909  } else if(conn->out_packet.qos == 1) {
910  /* Wait for PUBACK */
911  reset_packet(&conn->in_packet);
912  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
913  timer_expired(&conn->t));
914  if(timer_expired(&conn->t)) {
915  DBG("Timeout waiting for PUBACK\n");
916  }
917  if(conn->in_packet.mid != conn->out_packet.mid) {
918  DBG("MQTT - Warning, got PUBACK with none matching MID. Currently there "
919  "is no support for several concurrent PUBLISH messages.\n");
920  }
921  } else if(conn->out_packet.qos == 2) {
922  DBG("MQTT - QoS not implemented yet.\n");
923  /* Should wait for PUBREC, send PUBREL and then wait for PUBCOMP */
924  }
925 
926  reset_packet(&conn->in_packet);
927 
928  /* This is clear after the entire transaction is complete */
929  conn->out_queue_full = 0;
930 
931  DBG("MQTT - Publish Enqueued\n");
932 
933  PT_END(pt);
934 }
935 /*---------------------------------------------------------------------------*/
936 static
937 PT_THREAD(pingreq_pt(struct pt *pt, struct mqtt_connection *conn))
938 {
939  PT_BEGIN(pt);
940 
941  DBG("MQTT - Sending PINGREQ\n");
942 
943  /* Write Fixed Header */
944  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
945  PT_MQTT_WRITE_BYTE(conn, 0);
946 
947  send_out_buffer(conn);
948 
949  /* Start timeout for reply. */
950  conn->waiting_for_pingresp = 1;
951 
952  /* Wait for PINGRESP or timeout */
953  reset_packet(&conn->in_packet);
954  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
955 
956  PT_WAIT_UNTIL(pt, conn->in_packet.packet_received || timer_expired(&conn->t));
957 
958  reset_packet(&conn->in_packet);
959 
960  conn->waiting_for_pingresp = 0;
961 
962  PT_END(pt);
963 }
964 /*---------------------------------------------------------------------------*/
965 #if MQTT_5
966 static
967 PT_THREAD(auth_pt(struct pt *pt, struct mqtt_connection *conn))
968 {
969  PT_BEGIN(pt);
970 
971  conn->out_packet.remaining_length +=
972  conn->out_props ? (conn->out_props->properties_len + conn->out_props->properties_len_enc_bytes)
973  : 1;
974 
975  mqtt_encode_var_byte_int(conn->out_packet.remaining_length_enc,
976  &conn->out_packet.remaining_length_enc_bytes,
977  conn->out_packet.remaining_length);
978 
979  if(conn->out_packet.remaining_length_enc_bytes > 4) {
980  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
981  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
982  PT_EXIT(pt);
983  }
984 
985  /* Write Fixed Header */
986  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
987  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
988  conn->out_packet.remaining_length_enc_bytes);
989 
990  /* Write Variable Header */
991  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.auth_reason_code);
992 
993  /* Write Properties */
994  write_out_props(pt, conn, conn->out_props);
995 
996  /* No Payload */
997  send_out_buffer(conn);
998 
999  PT_WAIT_UNTIL(pt, conn->out_buffer_sent);
1000 
1001  PT_END(pt);
1002 }
1003 #endif
1004 /*---------------------------------------------------------------------------*/
1005 static void
1006 handle_connack(struct mqtt_connection *conn)
1007 {
1008  struct mqtt_connack_event connack_event;
1009 
1010  DBG("MQTT - Got CONNACK\n");
1011 
1012 #if MQTT_PROTOCOL_VERSION <= MQTT_PROTOCOL_VERSION_3_1_1
1013  if(conn->in_packet.remaining_length != 2) {
1014  PRINTF("MQTT - CONNACK VHDR remaining length %i incorrect\n",
1015  conn->in_packet.remaining_length);
1016  call_event(conn,
1017  MQTT_EVENT_ERROR,
1018  NULL);
1019  abort_connection(conn);
1020  return;
1021  }
1022 
1023  if(conn->in_packet.payload[1] != 0) {
1024  PRINTF("MQTT - Connection refused with Return Code %i\n",
1025  conn->in_packet.payload[1]);
1026  call_event(conn,
1027  MQTT_EVENT_CONNECTION_REFUSED_ERROR,
1028  &conn->in_packet.payload[1]);
1029  abort_connection(conn);
1030  return;
1031  }
1032 #endif
1033 
1034  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1035 
1036 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
1037  connack_event.session_present = conn->in_packet.payload[0] & MQTT_VHDR_CONNACK_SESSION_PRESENT;
1038 #endif
1039 
1040 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5
1041  /* The CONNACK VHDR must contain:
1042  * 0: Connect Acknowledge Flags
1043  * 1: Connect Reason Code
1044  * 2: Properties (whose Length field must be set even if no properties are present)
1045  */
1046  if(conn->in_packet.remaining_length < 3) {
1047  PRINTF("MQTT - CONNACK VHDR remaining length %i incorrect\n",
1048  conn->in_packet.remaining_length);
1049  call_event(conn,
1050  MQTT_EVENT_ERROR,
1051  NULL);
1052  abort_connection(conn);
1053  return;
1054  }
1055  mqtt_prop_parse_connack_props(conn);
1056 #endif
1057 
1058  ctimer_set(&conn->keep_alive_timer, conn->keep_alive * CLOCK_SECOND,
1059  keep_alive_callback, conn);
1060 
1061  /* Always reset packet before callback since it might be used directly */
1062  conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
1063  call_event(conn, MQTT_EVENT_CONNECTED, &connack_event);
1064 }
1065 /*---------------------------------------------------------------------------*/
1066 static void
1067 handle_pingresp(struct mqtt_connection *conn)
1068 {
1069  DBG("MQTT - Got PINGRESP\n");
1070 }
1071 /*---------------------------------------------------------------------------*/
1072 static void
1073 handle_suback(struct mqtt_connection *conn)
1074 {
1075  struct mqtt_suback_event suback_event;
1076 
1077  DBG("MQTT - Got SUBACK\n");
1078 
1079  /* Only accept SUBACKS with X topic QoS response, assume 1 */
1080 #if MQTT_5
1081  if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1082  MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE +
1083  conn->in_packet.properties_len + conn->in_packet.properties_enc_len) {
1084 #else
1085  if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
1086  MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
1087 #endif
1088  DBG("MQTT - Error, SUBACK with > 1 topic, not supported.\n");
1089  }
1090 
1091  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1092 
1093  suback_event.mid = conn->in_packet.mid;
1094 
1095 #if !MQTT_31
1096  suback_event.success = 0;
1097 
1098  switch(conn->in_packet.payload_start[0]) {
1099  case MQTT_SUBACK_RET_FAIL:
1100  PRINTF("MQTT - Error, SUBSCRIBE failed with SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1101  break;
1102 
1103  case MQTT_SUBACK_RET_QOS_0:
1104  case MQTT_SUBACK_RET_QOS_1:
1105  case MQTT_SUBACK_RET_QOS_2:
1106  suback_event.qos_level = conn->in_packet.payload_start[0] & 0x03;
1107  suback_event.success = 1;
1108  break;
1109 
1110  default:
1111  PRINTF("MQTT - Error, Unrecognised SUBACK return code '%x'", conn->in_packet.payload_start[0]);
1112  break;
1113  }
1114 
1115  suback_event.return_code = conn->in_packet.payload_start[0];
1116 #else
1117  suback_event.qos_level = conn->in_packet.payload_start[0];
1118 #endif
1119 
1120  if(conn->in_packet.mid != conn->out_packet.mid) {
1121  DBG("MQTT - Warning, got SUBACK with none matching MID. Currently there is"
1122  "no support for several concurrent SUBSCRIBE messages.\n");
1123  }
1124 
1125  /* Always reset packet before callback since it might be used directly */
1126  call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
1127 }
1128 /*---------------------------------------------------------------------------*/
1129 static void
1130 handle_unsuback(struct mqtt_connection *conn)
1131 {
1132  DBG("MQTT - Got UNSUBACK\n");
1133 
1134  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1135 
1136  if(conn->in_packet.mid != conn->out_packet.mid) {
1137  DBG("MQTT - Warning, got UNSUBACK with none matching MID. Currently there is"
1138  "no support for several concurrent UNSUBSCRIBE messages.\n");
1139  }
1140 
1141  call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
1142 }
1143 /*---------------------------------------------------------------------------*/
1144 static void
1145 handle_puback(struct mqtt_connection *conn)
1146 {
1147  DBG("MQTT - Got PUBACK\n");
1148 
1149  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
1150 
1151  call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
1152 }
1153 /*---------------------------------------------------------------------------*/
1154 static mqtt_pub_status_t
1155 handle_publish(struct mqtt_connection *conn)
1156 {
1157  DBG("MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
1158  DBG("MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
1159 
1160 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_3_1_1
1161  if(strlen(conn->in_publish_msg.topic) < conn->in_packet.topic_len) {
1162  DBG("NULL detected in received PUBLISH topic\n");
1163 #if MQTT_5
1164  mqtt_disconnect(conn, MQTT_PROP_LIST_NONE);
1165 #else
1166  mqtt_disconnect(conn);
1167 #endif
1168  return MQTT_PUBLISH_ERR;
1169  }
1170 #endif
1171 
1172  DBG("MQTT - This chunk is %i bytes\n", conn->in_publish_msg.payload_chunk_length);
1173 
1174  if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
1175  PRINTF("MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
1176  }
1177 
1178  call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
1179 
1180  if(conn->in_publish_msg.first_chunk == 1) {
1181  conn->in_publish_msg.first_chunk = 0;
1182  }
1183 
1184  /* If this is the last time handle_publish will be called, reset packet. */
1185  if(conn->in_publish_msg.payload_left == 0) {
1186 
1187  /* Check for QoS and initiate the reply, do not rely on the data in the
1188  * in_packet being untouched. */
1189 
1190  DBG("MQTT - (handle_publish) resetting packet.\n");
1191  reset_packet(&conn->in_packet);
1192  }
1193 
1194  return MQTT_PUBLISH_OK;
1195 }
1196 /*---------------------------------------------------------------------------*/
1197 static void
1198 parse_publish_vhdr(struct mqtt_connection *conn,
1199  uint32_t *pos,
1200  const uint8_t *input_data_ptr,
1201  int input_data_len)
1202 {
1203  uint16_t copy_bytes;
1204 
1205  /* Read out topic length */
1206  if(conn->in_packet.topic_len_received == 0) {
1207  conn->in_packet.topic_pos = 0;
1208  conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
1209  conn->in_packet.byte_counter++;
1210  if(*pos >= input_data_len) {
1211  return;
1212  }
1213  conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
1214  conn->in_packet.byte_counter++;
1215  conn->in_packet.topic_len_received = 1;
1216  /* Abort if topic is longer than our topic buffer */
1217  if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
1218  DBG("MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
1219  return;
1220  }
1221  DBG("MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
1222  /* WARNING: Check here if TOPIC fits in payload area, otherwise error */
1223  }
1224 
1225  /* Read out topic */
1226  if(conn->in_packet.topic_len_received == 1 &&
1227  conn->in_packet.topic_received == 0) {
1228  copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
1229  input_data_len - *pos);
1230  DBG("MQTT - topic_pos: %i copy_bytes: %i\n", conn->in_packet.topic_pos,
1231  copy_bytes);
1232  memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
1233  &input_data_ptr[*pos],
1234  copy_bytes);
1235  (*pos) += copy_bytes;
1236  conn->in_packet.byte_counter += copy_bytes;
1237  conn->in_packet.topic_pos += copy_bytes;
1238 
1239  if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
1240  DBG("MQTT - Got topic '%s'", conn->in_publish_msg.topic);
1241  conn->in_packet.topic_received = 1;
1242  conn->in_publish_msg.topic[conn->in_packet.topic_pos] = '\0';
1243  conn->in_publish_msg.payload_length =
1244  conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
1245  conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
1246  }
1247 
1248  /* Set this once per incomming publish message */
1249  conn->in_publish_msg.first_chunk = 1;
1250  }
1251 }
1252 /*---------------------------------------------------------------------------*/
1253 /* MQTTv5 only */
1254 #if MQTT_5
1255 static void
1256 handle_disconnect(struct mqtt_connection *conn)
1257 {
1258  DBG("MQTT - (handle_disconnect) Got DISCONNECT.\n");
1259  call_event(conn, MQTT_EVENT_DISCONNECTED, NULL);
1260  abort_connection(conn);
1261 }
1262 /*---------------------------------------------------------------------------*/
1263 static void
1264 handle_auth(struct mqtt_connection *conn)
1265 {
1266  struct mqtt_prop_auth_event event;
1267 
1268  DBG("MQTT - (handle_auth) Got AUTH.\n");
1269 
1270  if((conn->in_packet.fhdr & 0x0F) != 0x0) {
1271  call_event(conn,
1272  MQTT_EVENT_ERROR,
1273  NULL);
1274  abort_connection(conn);
1275  return;
1276  }
1277 
1278  /* AUTH messages from the server */
1279  if(conn->state == MQTT_CONN_STATE_CONNECTING_TO_BROKER &&
1280  (!conn->in_packet.has_reason_code ||
1281  conn->in_packet.reason_code != MQTT_VHDR_RC_CONTINUE_AUTH)) {
1282  DBG("MQTT - (handle_auth) Not reauth - Reason Code 0x18 expected!\n");
1283  }
1284 
1285  mqtt_prop_parse_auth_props(conn, &event);
1286  call_event(conn, MQTT_EVENT_AUTH, &event);
1287 }
1288 #endif
1289 /*---------------------------------------------------------------------------*/
1290 static void
1291 parse_vhdr(struct mqtt_connection *conn)
1292 {
1293  conn->in_packet.payload_start = conn->in_packet.payload;
1294 
1295  /* Some message types include a packet identifier */
1296  switch(conn->in_packet.fhdr & 0xF0) {
1297  case MQTT_FHDR_MSG_TYPE_PUBACK:
1298  case MQTT_FHDR_MSG_TYPE_SUBACK:
1299  case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1300  conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
1301  (conn->in_packet.payload[1]);
1302  conn->in_packet.payload_start += 2;
1303  break;
1304 
1305  /* Other message types have a 0-length VHDR */
1306  /* PUBLISH has a VHDR for QoS > 0, which is currently unsupported */
1307  default:
1308  break;
1309  }
1310 
1311 #if MQTT_5
1312  /* CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, DISCONNECT and AUTH have a single
1313  * Reason Code as part of the Variable Header.
1314  * SUBACK and UNSUBACK contain a list of one or more Reason Codes in the Payload.
1315  */
1316  switch(conn->in_packet.fhdr & 0xF0) {
1317  case MQTT_FHDR_MSG_TYPE_CONNACK:
1318  case MQTT_FHDR_MSG_TYPE_PUBACK:
1319  case MQTT_FHDR_MSG_TYPE_PUBREC:
1320  case MQTT_FHDR_MSG_TYPE_PUBREL:
1321  case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1322  case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1323  case MQTT_FHDR_MSG_TYPE_AUTH:
1324  conn->in_packet.reason_code = conn->in_packet.payload_start[0];
1325  conn->in_packet.has_reason_code = 1;
1326  conn->in_packet.payload_start += 1;
1327  break;
1328 
1329  default:
1330  conn->in_packet.has_reason_code = 0;
1331  break;
1332  }
1333 
1334  if(!conn->in_packet.has_props) {
1335  mqtt_prop_decode_input_props(conn);
1336  }
1337 #endif
1338 }
1339 /*---------------------------------------------------------------------------*/
1340 static int
1341 tcp_input(struct tcp_socket *s,
1342  void *ptr,
1343  const uint8_t *input_data_ptr,
1344  int input_data_len)
1345 {
1346  struct mqtt_connection *conn = ptr;
1347  uint32_t pos = 0;
1348  uint32_t copy_bytes = 0;
1349  mqtt_pub_status_t pub_status;
1350  uint8_t remaining_length_bytes;
1351 
1352  if(input_data_len == 0) {
1353  return 0;
1354  }
1355 
1356  if(conn->in_packet.packet_received) {
1357  reset_packet(&conn->in_packet);
1358  }
1359 
1360  DBG("tcp_input with %i bytes of data:\n", input_data_len);
1361 
1362  /* Read the fixed header field, if we do not have it */
1363  if(!conn->in_packet.fhdr) {
1364  conn->in_packet.fhdr = input_data_ptr[pos++];
1365  conn->in_packet.byte_counter++;
1366 
1367  DBG("MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
1368 
1369  if(pos >= input_data_len) {
1370  return 0;
1371  }
1372  }
1373 
1374  /* Read the Remaining Length field, if we do not have it */
1375  if(!conn->in_packet.has_remaining_length) {
1376  remaining_length_bytes =
1377  mqtt_decode_var_byte_int(input_data_ptr, input_data_len, &pos,
1378  &conn->in_packet.byte_counter,
1379  &conn->in_packet.remaining_length);
1380 
1381  if(remaining_length_bytes == 0) {
1382  call_event(conn, MQTT_EVENT_ERROR, NULL);
1383  return 0;
1384  }
1385 
1386  DBG("MQTT - Finished reading remaining length byte\n");
1387  conn->in_packet.has_remaining_length = 1;
1388  }
1389 
1390  /*
1391  * Check for unsupported payload length. Will read all incoming data from the
1392  * server in any case and then reset the packet.
1393  *
1394  * TODO: Decide if we, for example, want to disconnect instead.
1395  */
1396  if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
1397  (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
1398 
1399  PRINTF("MQTT - Error, unsupported payload size for non-PUBLISH message\n");
1400 
1401  conn->in_packet.byte_counter += input_data_len;
1402  if(conn->in_packet.byte_counter >=
1403  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1404  conn->in_packet.packet_received = 1;
1405  }
1406  return 0;
1407  }
1408 
1409  /*
1410  * Supported payload, reads out both VHDR and Payload of all packets.
1411  *
1412  * Note: There will always be at least one byte left to read when we enter
1413  * this loop.
1414  */
1415  while(conn->in_packet.byte_counter <
1416  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1417 
1418  if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1419  conn->in_packet.topic_received == 0) {
1420  parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1421  }
1422 
1423  /* Read in as much as we can into the packet payload */
1424  copy_bytes = MIN(input_data_len - pos,
1425  MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1426  DBG("- Copied %i payload bytes\n", copy_bytes);
1427  memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1428  &input_data_ptr[pos],
1429  copy_bytes);
1430  conn->in_packet.byte_counter += copy_bytes;
1431  conn->in_packet.payload_pos += copy_bytes;
1432  pos += copy_bytes;
1433 
1434 #if DEBUG_MQTT == 1
1435  uint32_t i;
1436  DBG("MQTT - Copied bytes: \n");
1437  for(i = 0; i < copy_bytes; i++) {
1438  DBG("%02X ", conn->in_packet.payload[i]);
1439  }
1440  DBG("\n");
1441 #endif
1442 
1443  /* Full buffer, shall only happen to PUBLISH messages. */
1444  if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1445  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1446  conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1447  conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1448 
1449 #if MQTT_5
1450  if(!conn->in_packet.has_props) {
1451  mqtt_prop_decode_input_props(conn);
1452  }
1453 
1454  if(conn->in_publish_msg.first_chunk) {
1455  conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1456  conn->in_packet.properties_enc_len;
1457 
1458  /* Payload chunk should point past the MQTT properties and to the payload itself */
1459  conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1460  conn->in_packet.properties_enc_len;
1461  }
1462 #endif
1463 
1464  pub_status = handle_publish(conn);
1465 
1466  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1467  conn->in_packet.payload_pos = 0;
1468 
1469  if(pub_status != MQTT_PUBLISH_OK) {
1470  return 0;
1471  }
1472  }
1473 
1474  if(pos >= input_data_len &&
1475  (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1476  return 0;
1477  }
1478  }
1479 
1480  parse_vhdr(conn);
1481 
1482  /* Debug information */
1483  DBG("\n");
1484  /* Take care of input */
1485  DBG("MQTT - Finished reading packet!\n");
1486  /* What to return? */
1487  DBG("MQTT - total data was %i bytes of data. \n",
1488  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1489 
1490 #if MQTT_5
1491  if(conn->in_packet.has_reason_code &&
1492  conn->in_packet.reason_code >= MQTT_VHDR_RC_UNSPEC_ERR) {
1493  PRINTF("MQTT - Reason Code indicated error %i\n",
1494  conn->in_packet.reason_code);
1495  call_event(conn,
1496  MQTT_EVENT_ERROR,
1497  NULL);
1498  abort_connection(conn);
1499  return 0;
1500  }
1501 #endif
1502 
1503  /* Handle packet here. */
1504  switch(conn->in_packet.fhdr & 0xF0) {
1505  case MQTT_FHDR_MSG_TYPE_CONNACK:
1506  handle_connack(conn);
1507  break;
1508  case MQTT_FHDR_MSG_TYPE_PUBLISH:
1509  /* This is the only or the last chunk of publish payload */
1510  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1511  conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1512  conn->in_publish_msg.payload_left = 0;
1513 
1514  DBG("MQTT - First chunk? %i\n", conn->in_publish_msg.first_chunk);
1515 #if MQTT_5
1516  if(conn->in_publish_msg.first_chunk) {
1517  conn->in_publish_msg.payload_chunk_length -= conn->in_packet.properties_len +
1518  conn->in_packet.properties_enc_len;
1519  /* Payload chunk should point past the MQTT properties and to the payload itself */
1520  conn->in_publish_msg.payload_chunk += conn->in_packet.properties_len +
1521  conn->in_packet.properties_enc_len;
1522  }
1523 #endif
1524  (void)handle_publish(conn);
1525  break;
1526  case MQTT_FHDR_MSG_TYPE_PUBACK:
1527  handle_puback(conn);
1528  break;
1529  case MQTT_FHDR_MSG_TYPE_SUBACK:
1530  handle_suback(conn);
1531  break;
1532  case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1533  handle_unsuback(conn);
1534  break;
1535  case MQTT_FHDR_MSG_TYPE_PINGRESP:
1536  handle_pingresp(conn);
1537  break;
1538 
1539  /* QoS 2 not implemented yet */
1540  case MQTT_FHDR_MSG_TYPE_PUBREC:
1541  case MQTT_FHDR_MSG_TYPE_PUBREL:
1542  case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1543  call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1544  PRINTF("MQTT - Got unhandled MQTT Message Type '%i'",
1545  (conn->in_packet.fhdr & 0xF0));
1546  break;
1547 
1548 #if MQTT_PROTOCOL_VERSION >= MQTT_PROTOCOL_VERSION_5
1549  case MQTT_FHDR_MSG_TYPE_DISCONNECT:
1550  handle_disconnect(conn);
1551  break;
1552 
1553  case MQTT_FHDR_MSG_TYPE_AUTH:
1554  handle_auth(conn);
1555  break;
1556 #endif
1557 
1558  default:
1559  /* All server-only message */
1560  PRINTF("MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1561  break;
1562  }
1563 
1564  conn->in_packet.packet_received = 1;
1565 
1566  return 0;
1567 }
1568 /*---------------------------------------------------------------------------*/
1569 /*
1570  * Handles TCP events from Simple TCP
1571  */
1572 static void
1573 tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event)
1574 {
1575  struct mqtt_connection *conn = ptr;
1576 
1577  /* Take care of event */
1578  switch(event) {
1579 
1580  /* Fall through to manage different disconnect event the same way. */
1581  case TCP_SOCKET_CLOSED:
1582  case TCP_SOCKET_TIMEDOUT:
1583  case TCP_SOCKET_ABORTED: {
1584 
1585  DBG("MQTT - Disconnected by tcp event %d\n", event);
1586  process_post(&mqtt_process, mqtt_abort_now_event, conn);
1587  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1588  ctimer_stop(&conn->keep_alive_timer);
1589  call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1590  abort_connection(conn);
1591 
1592  /* If connecting retry */
1593  if(conn->auto_reconnect == 1) {
1594  connect_tcp(conn);
1595  }
1596  break;
1597  }
1598  case TCP_SOCKET_CONNECTED: {
1599  conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1600  conn->out_buffer_sent = 1;
1601 
1602  process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1603  break;
1604  }
1605  case TCP_SOCKET_DATA_SENT: {
1606  DBG("MQTT - Got TCP_DATA_SENT\n");
1607 
1608  if(conn->socket.output_data_len == 0) {
1609  conn->out_buffer_sent = 1;
1610  conn->out_buffer_ptr = conn->out_buffer;
1611  }
1612 
1613  ctimer_restart(&conn->keep_alive_timer);
1614  break;
1615  }
1616 
1617  default: {
1618  DBG("MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1619  event);
1620  }
1621  }
1622 }
1623 /*---------------------------------------------------------------------------*/
1624 PROCESS_THREAD(mqtt_process, ev, data)
1625 {
1626  static struct mqtt_connection *conn;
1627 
1628  PROCESS_BEGIN();
1629 
1630  while(1) {
1632 
1633  if(ev == mqtt_abort_now_event) {
1634  DBG("MQTT - Abort\n");
1635  conn = data;
1636  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1637 
1638  abort_connection(conn);
1639  }
1640  if(ev == mqtt_do_connect_tcp_event) {
1641  conn = data;
1642  DBG("MQTT - Got mqtt_do_connect_tcp_event!\n");
1643  connect_tcp(conn);
1644  }
1645  if(ev == mqtt_do_connect_mqtt_event) {
1646  conn = data;
1647  conn->socket.output_data_max_seg = conn->max_segment_size;
1648  DBG("MQTT - Got mqtt_do_connect_mqtt_event!\n");
1649 
1650  if(conn->out_buffer_sent == 1) {
1651  PT_INIT(&conn->out_proto_thread);
1652  while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1653  conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1654  PT_MQTT_WAIT_SEND();
1655  }
1656  }
1657  }
1658  if(ev == mqtt_do_disconnect_mqtt_event) {
1659  conn = data;
1660  DBG("MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1661 
1662  /* Send MQTT Disconnect if we are connected */
1663  if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1664  if(conn->out_buffer_sent == 1) {
1665  PT_INIT(&conn->out_proto_thread);
1666  while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1667  disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1668  PT_MQTT_WAIT_SEND();
1669  }
1670  abort_connection(conn);
1671  call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1672  } else {
1673  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1674  }
1675  }
1676  }
1677  if(ev == mqtt_do_pingreq_event) {
1678  conn = data;
1679  DBG("MQTT - Got mqtt_do_pingreq_event!\n");
1680 
1681  if(conn->out_buffer_sent == 1 &&
1682  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1683  PT_INIT(&conn->out_proto_thread);
1684  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1685  pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1686  PT_MQTT_WAIT_SEND();
1687  }
1688  }
1689  }
1690  if(ev == mqtt_do_subscribe_event) {
1691  conn = data;
1692  DBG("MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1693 
1694  if(conn->out_buffer_sent == 1 &&
1695  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1696  PT_INIT(&conn->out_proto_thread);
1697  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1698  subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1699  PT_MQTT_WAIT_SEND();
1700  }
1701  }
1702  }
1703  if(ev == mqtt_do_unsubscribe_event) {
1704  conn = data;
1705  DBG("MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1706 
1707  if(conn->out_buffer_sent == 1 &&
1708  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1709  PT_INIT(&conn->out_proto_thread);
1710  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1711  unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1712  PT_MQTT_WAIT_SEND();
1713  }
1714  }
1715  }
1716  if(ev == mqtt_do_publish_event) {
1717  conn = data;
1718  DBG("MQTT - Got mqtt_do_publish_mqtt_event!\n");
1719 
1720  if(conn->out_buffer_sent == 1 &&
1721  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1722  PT_INIT(&conn->out_proto_thread);
1723  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1724  publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1725  PT_MQTT_WAIT_SEND();
1726  }
1727  }
1728  }
1729 #if MQTT_5
1730  if(ev == mqtt_do_auth_event) {
1731  conn = data;
1732  DBG("MQTT - Got mqtt_do_auth_event!\n");
1733 
1734  if(conn->out_buffer_sent == 1) {
1735  PT_INIT(&conn->out_proto_thread);
1736  while(auth_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1737  PT_MQTT_WAIT_SEND();
1738  }
1739  }
1740  }
1741  /* clear output properties; the next message sent should overwrite them */
1742  conn->out_props = NULL;
1743 #endif
1744  }
1745  PROCESS_END();
1746 }
1747 /*---------------------------------------------------------------------------*/
1748 void
1749 mqtt_init(void)
1750 {
1751  static uint8_t inited = 0;
1752  if(!inited) {
1753  mqtt_do_connect_tcp_event = process_alloc_event();
1754  mqtt_event_min = mqtt_do_connect_tcp_event;
1755 
1756  mqtt_do_connect_mqtt_event = process_alloc_event();
1757  mqtt_do_disconnect_mqtt_event = process_alloc_event();
1758  mqtt_do_subscribe_event = process_alloc_event();
1759  mqtt_do_unsubscribe_event = process_alloc_event();
1760  mqtt_do_publish_event = process_alloc_event();
1761  mqtt_do_pingreq_event = process_alloc_event();
1762  mqtt_update_event = process_alloc_event();
1763  mqtt_abort_now_event = process_alloc_event();
1764  mqtt_event_max = mqtt_abort_now_event;
1765 
1766  mqtt_continue_send_event = process_alloc_event();
1767  mqtt_do_auth_event = process_alloc_event();
1768 
1769  list_init(mqtt_conn_list);
1770 
1771  process_start(&mqtt_process, NULL);
1772  inited = 1;
1773  }
1774 }
1775 /*---------------------------------------------------------------------------*/
1776 mqtt_status_t
1777 mqtt_register(struct mqtt_connection *conn, struct process *app_process,
1778  char *client_id, mqtt_event_callback_t event_callback,
1779  uint16_t max_segment_size)
1780 {
1781 #if MQTT_31 || !MQTT_SRV_SUPPORTS_EMPTY_CLIENT_ID
1782  if(strlen(client_id) < 1) {
1783  return MQTT_STATUS_INVALID_ARGS_ERROR;
1784  }
1785 #endif
1786 
1787  /* Set defaults - Set all to zero to begin with */
1788  memset(conn, 0, sizeof(struct mqtt_connection));
1789 #if MQTT_5
1790  /* Server capabilities have non-zero defaults */
1791  conn->srv_feature_en = -1;
1792 #endif
1793  string_to_mqtt_string(&conn->client_id, client_id);
1794  conn->event_callback = event_callback;
1795  conn->app_process = app_process;
1796  conn->auto_reconnect = 1;
1797  conn->max_segment_size = max_segment_size;
1798 
1799  reset_defaults(conn);
1800 
1801  mqtt_init();
1802 
1803  list_add(mqtt_conn_list, conn);
1804 
1805  DBG("MQTT - Registered successfully\n");
1806 
1807  return MQTT_STATUS_OK;
1808 }
1809 /*---------------------------------------------------------------------------*/
1810 /*
1811  * Connect to MQTT broker.
1812  *
1813  * N.B. Non-blocking call.
1814  */
1815 mqtt_status_t
1816 mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port,
1817  uint16_t keep_alive,
1818 #if MQTT_5
1819  uint8_t clean_session,
1820  struct mqtt_prop_list *prop_list)
1821 #else
1822  uint8_t clean_session)
1823 #endif
1824 {
1825  uip_ip6addr_t ip6addr;
1826  uip_ipaddr_t *ipaddr;
1827  ipaddr = &ip6addr;
1828 
1829  /* Check if we are already trying to connect */
1830  if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1831  return MQTT_STATUS_OK;
1832  }
1833 
1834  conn->server_host = host;
1835  conn->keep_alive = keep_alive;
1836  conn->server_port = port;
1837  conn->out_buffer_ptr = conn->out_buffer;
1838  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1839 
1840  /* If the Client supplies a zero-byte ClientId, the Client MUST also set CleanSession to 1 */
1841  if(clean_session || (conn->client_id.length == 0)) {
1842  conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1843  }
1844 
1845  /* convert the string IPv6 address to a numeric IPv6 address */
1846  if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1847  return MQTT_STATUS_ERROR;
1848  }
1849 
1850  uip_ipaddr_copy(&(conn->server_ip), ipaddr);
1851 
1852  /*
1853  * Initiate the connection if the IP could be resolved. Otherwise the
1854  * connection will be initiated when the DNS lookup is finished, in the main
1855  * event loop.
1856  */
1857 #if MQTT_5
1858  conn->out_props = prop_list;
1859 #endif
1860 
1861  process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1862 
1863  return MQTT_STATUS_OK;
1864 }
1865 /*----------------------------------------------------------------------------*/
1866 void
1867 #if MQTT_5
1868 mqtt_disconnect(struct mqtt_connection *conn,
1869  struct mqtt_prop_list *prop_list)
1870 #else
1871 mqtt_disconnect(struct mqtt_connection *conn)
1872 #endif
1873 {
1874  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1875  return;
1876  }
1877 
1878  conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1879 
1880 #if MQTT_5
1881  conn->out_props = prop_list;
1882 #endif
1883 
1884  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1885 }
1886 /*----------------------------------------------------------------------------*/
1887 mqtt_status_t
1888 mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1889 #if MQTT_5
1890  mqtt_qos_level_t qos_level,
1891  mqtt_nl_en_t nl, mqtt_rap_en_t rap,
1892  mqtt_retain_handling_t ret_handling,
1893  struct mqtt_prop_list *prop_list)
1894 #else
1895  mqtt_qos_level_t qos_level)
1896 #endif
1897 {
1898  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1899  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1900  }
1901 
1902  DBG("MQTT - Call to mqtt_subscribe...\n");
1903 
1904  /* Currently don't have a queue, so only one item at a time */
1905  if(conn->out_queue_full) {
1906  DBG("MQTT - Not accepted!\n");
1907  return MQTT_STATUS_OUT_QUEUE_FULL;
1908  }
1909  conn->out_queue_full = 1;
1910  DBG("MQTT - Accepted!\n");
1911 
1912  conn->out_packet.mid = INCREMENT_MID(conn);
1913  conn->out_packet.topic = topic;
1914  conn->out_packet.topic_length = strlen(topic);
1915  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1916 
1917  if(mid) {
1918  *mid = conn->out_packet.mid;
1919  }
1920 
1921 #if MQTT_5
1922  conn->out_packet.sub_options = 0x00;
1923  conn->out_packet.sub_options |= qos_level & MQTT_SUB_OPTION_QOS;
1924  conn->out_packet.sub_options |= nl & MQTT_SUB_OPTION_NL;
1925  conn->out_packet.sub_options |= rap & MQTT_SUB_OPTION_RAP;
1926  conn->out_packet.sub_options |= ret_handling & MQTT_SUB_OPTION_RETAIN_HANDLING;
1927 #else
1928  conn->out_packet.qos = qos_level;
1929 #endif
1930 
1931 #if MQTT_5
1932  conn->out_props = prop_list;
1933 #endif
1934 
1935  process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1936  return MQTT_STATUS_OK;
1937 }
1938 /*----------------------------------------------------------------------------*/
1939 mqtt_status_t
1940 mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid,
1941 #if MQTT_5
1942  char *topic,
1943  struct mqtt_prop_list *prop_list)
1944 #else
1945  char *topic)
1946 #endif
1947 {
1948  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1949  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1950  }
1951 
1952  DBG("MQTT - Call to mqtt_unsubscribe...\n");
1953  /* Currently don't have a queue, so only one item at a time */
1954  if(conn->out_queue_full) {
1955  DBG("MQTT - Not accepted!\n");
1956  return MQTT_STATUS_OUT_QUEUE_FULL;
1957  }
1958  conn->out_queue_full = 1;
1959  DBG("MQTT - Accepted!\n");
1960 
1961  conn->out_packet.mid = INCREMENT_MID(conn);
1962  conn->out_packet.topic = topic;
1963  conn->out_packet.topic_length = strlen(topic);
1964  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1965 
1966  if(mid) {
1967  *mid = conn->out_packet.mid;
1968  }
1969 
1970 #if MQTT_5
1971  conn->out_props = prop_list;
1972 #endif
1973 
1974  process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1975  return MQTT_STATUS_OK;
1976 }
1977 /*----------------------------------------------------------------------------*/
1978 mqtt_status_t
1979 mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1980  uint8_t *payload, uint32_t payload_size,
1981  mqtt_qos_level_t qos_level,
1982 #if MQTT_5
1983  mqtt_retain_t retain,
1984  uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en,
1985  struct mqtt_prop_list *prop_list)
1986 #else
1987  mqtt_retain_t retain)
1988 #endif
1989 {
1990  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1991  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1992  }
1993 
1994  DBG("MQTT - Call to mqtt_publish...\n");
1995 
1996  /* Currently don't have a queue, so only one item at a time */
1997  if(conn->out_queue_full) {
1998  DBG("MQTT - Not accepted!\n");
1999  return MQTT_STATUS_OUT_QUEUE_FULL;
2000  }
2001  conn->out_queue_full = 1;
2002  DBG("MQTT - Accepted!\n");
2003 
2004  conn->out_packet.mid = INCREMENT_MID(conn);
2005  conn->out_packet.retain = retain;
2006 #if MQTT_5
2007  if(topic_alias_en == MQTT_TOPIC_ALIAS_ON) {
2008  conn->out_packet.topic = "";
2009  conn->out_packet.topic_length = 0;
2010  conn->out_packet.topic_alias = topic_alias;
2011  if(topic_alias == 0) {
2012  DBG("MQTT - Error, a topic alias of 0 is not permitted! It won't be sent.\n");
2013  }
2014  } else {
2015  conn->out_packet.topic = topic;
2016  conn->out_packet.topic_length = strlen(topic);
2017  conn->out_packet.topic_alias = 0;
2018  }
2019 #else
2020  conn->out_packet.topic = topic;
2021  conn->out_packet.topic_length = strlen(topic);
2022 #endif
2023  conn->out_packet.payload = payload;
2024  conn->out_packet.payload_size = payload_size;
2025  conn->out_packet.qos = qos_level;
2026  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
2027 
2028  if(mid) {
2029  *mid = conn->out_packet.mid;
2030  }
2031 
2032 #if MQTT_5
2033  conn->out_props = prop_list;
2034 #endif
2035 
2036  process_post(&mqtt_process, mqtt_do_publish_event, conn);
2037  return MQTT_STATUS_OK;
2038 }
2039 /*----------------------------------------------------------------------------*/
2040 void
2041 mqtt_set_username_password(struct mqtt_connection *conn, char *username,
2042  char *password)
2043 {
2044  /* Set strings, NULL string will simply set length to zero */
2045  string_to_mqtt_string(&conn->credentials.username, username);
2046  string_to_mqtt_string(&conn->credentials.password, password);
2047 
2048  /* Set CONNECT VHDR flags */
2049  if(username != NULL) {
2050  conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
2051  } else {
2052  conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
2053  }
2054  if(password != NULL) {
2055  conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
2056  } else {
2057  conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
2058  }
2059 }
2060 /*----------------------------------------------------------------------------*/
2061 void
2062 mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message,
2063 #if MQTT_5
2064  mqtt_qos_level_t qos, struct mqtt_prop_list *will_props)
2065 #else
2066  mqtt_qos_level_t qos)
2067 #endif
2068 {
2069  /* Set strings, NULL string will simply set length to zero */
2070  string_to_mqtt_string(&conn->will.topic, topic);
2071  string_to_mqtt_string(&conn->will.message, message);
2072 
2073  /* Currently not used! */
2074  conn->will.qos = qos;
2075 
2076  if(topic != NULL) {
2077  conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
2078  MQTT_VHDR_WILL_RETAIN_FLAG;
2079  }
2080 #if MQTT_5
2081  conn->will.properties = (list_t)will_props;
2082 #endif
2083 }
2084 /*---------------------------------------------------------------------------*/
2085 #if MQTT_5
2086 /*----------------------------------------------------------------------------*/
2087 /* MQTTv5-specific functions */
2088 /*----------------------------------------------------------------------------*/
2089 /*
2090  * Send authentication data to broker.
2091  *
2092  * N.B. Non-blocking call.
2093  */
2094 mqtt_status_t
2095 mqtt_auth(struct mqtt_connection *conn,
2096  mqtt_auth_type_t auth_type,
2097  struct mqtt_prop_list *prop_list)
2098 {
2099  DBG("MQTT - Call to mqtt_auth...\n");
2100 
2101  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_AUTH;
2102  conn->out_packet.remaining_length = 1; /* for the auth reason code */
2103  conn->out_packet.auth_reason_code = MQTT_VHDR_RC_CONTINUE_AUTH + auth_type;
2104 
2105  conn->out_props = prop_list;
2106 
2107  process_post(&mqtt_process, mqtt_do_auth_event, conn);
2108  return MQTT_STATUS_OK;
2109 }
2110 #endif
2111 /*----------------------------------------------------------------------------*/
2112 /** @} */
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
Definition: uip-nd6.c:116
mqtt_event_t
MQTT engine events.
Definition: mqtt.h:218
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, struct mqtt_prop_list *prop_list)
Unsubscribes from a MQTT topic.
Definition: mqtt.c:1940
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
Definition: timer.c:64
#define PROCESS(name, strname)
Declare a process.
Definition: process.h:307
Protothreads implementation.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
Definition: ctimer.c:149
void mqtt_disconnect(struct mqtt_connection *conn, struct mqtt_prop_list *prop_list)
Disconnects from a MQTT broker.
Definition: mqtt.c:1868
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
Definition: process.h:141
#define PROCESS_BEGIN()
Define the beginning of a process.
Definition: process.h:120
#define PROCESS_END()
Define the end of a process.
Definition: process.h:131
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level, mqtt_nl_en_t nl, mqtt_rap_en_t rap, mqtt_retain_handling_t ret_handling, struct mqtt_prop_list *prop_list)
Subscribes to a MQTT topic.
Definition: mqtt.c:1888
void ** list_t
The linked list type.
Definition: list.h:136
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
Definition: pt.h:280
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
Definition: pt.h:313
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
Definition: mqtt.h:499
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive, uint8_t clean_session, struct mqtt_prop_list *prop_list)
Connects to a MQTT broker.
Definition: mqtt.c:1816
Header file for IPv6-related data structures.
#define PT_INIT(pt)
Initialize a protothread.
Definition: pt.h:245
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain, uint8_t topic_alias, mqtt_topic_alias_en_t topic_alias_en, struct mqtt_prop_list *prop_list)
Publish to a MQTT topic.
Definition: mqtt.c:1979
#define CLOCK_SECOND
A second, measured in system clock time.
Definition: clock.h:82
Header file for the callback timer
#define PT_END(pt)
Declare the end of a protothread.
Definition: pt.h:292
Event timer header file.
Linked list manipulation routines.
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
Definition: mqtt.c:2041
void * list_head(list_t list)
Get a pointer to the first element of a list.
Definition: list.c:82
mqtt_status_t mqtt_auth(struct mqtt_connection *conn, mqtt_auth_type_t auth_type, struct mqtt_prop_list *prop_list)
Send authentication message (MQTTv5-only).
Definition: mqtt.c:2095
Header file for the Contiki MQTT engine.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
Definition: ctimer.c:99
int timer_expired(struct timer *t)
Check if a timer has expired.
Definition: timer.c:123
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
Definition: uip.h:1015
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos, struct mqtt_prop_list *will_props)
Set the last will topic and message for a MQTT client.
Definition: mqtt.c:2062
#define PT_EXIT(pt)
Exit the protothread.
Definition: pt.h:411
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
Definition: ctimer.c:137
#define PT_THREAD(name_args)
Declaration of a protothread.
Definition: pt.h:265
process_event_t process_alloc_event(void)
Allocate a global event number.
Definition: process.c:93
void list_add(list_t list, void *item)
Add an item at the end of a list.
Definition: list.c:142
void list_init(list_t list)
Initialize a list.
Definition: list.c:65
Header file for the uIP TCP/IP stack.
#define LIST(name)
Declare a linked list.
Definition: list.h:89
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
Definition: mqtt.c:1777
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
Definition: process.c:322
Default definitions of C compiler quirk work-arounds.
PROCESS_THREAD(cc2538_rf_process, ev, data)
Implementation of the cc2538 RF driver process.
Definition: cc2538-rf.c:1110
Header file for the LED HAL.
void * list_item_next(void *item)
Get the next item following this item.
Definition: list.c:322
void process_start(struct process *p, process_data_t data)
Start a process.
Definition: process.c:99