Contiki-NG
mqtt.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the copyright holder nor the names of its
14  * contributors may be used to endorse or promote products derived
15  * from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
20  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
21  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
28  * OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 /*---------------------------------------------------------------------------*/
31 /**
32  * \addtogroup mqtt-engine
33  * @{
34  */
35 /**
36  * \file
37  * Implementation of the Contiki MQTT engine
38  *
39  * \author
40  * Texas Instruments
41  */
42 /*---------------------------------------------------------------------------*/
43 #include "mqtt.h"
44 #include "contiki.h"
45 #include "contiki-net.h"
46 #include "contiki-lib.h"
47 #include "lib/random.h"
48 #include "sys/ctimer.h"
49 #include "sys/etimer.h"
50 #include "sys/pt.h"
51 #include "net/ipv6/uip.h"
52 #include "net/ipv6/uip-ds6.h"
53 #include "dev/leds.h"
54 
55 #include "tcp-socket.h"
56 
57 #include "lib/assert.h"
58 #include "lib/list.h"
59 #include "sys/cc.h"
60 
61 #include <stdlib.h>
62 #include <stdio.h>
63 #include <string.h>
64 /*---------------------------------------------------------------------------*/
65 #define DEBUG 0
66 #if DEBUG
67 #define PRINTF(...) printf(__VA_ARGS__)
68 #else
69 #define PRINTF(...)
70 #endif
71 /*---------------------------------------------------------------------------*/
72 typedef enum {
73  MQTT_FHDR_MSG_TYPE_CONNECT = 0x10,
74  MQTT_FHDR_MSG_TYPE_CONNACK = 0x20,
75  MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30,
76  MQTT_FHDR_MSG_TYPE_PUBACK = 0x40,
77  MQTT_FHDR_MSG_TYPE_PUBREC = 0x50,
78  MQTT_FHDR_MSG_TYPE_PUBREL = 0x60,
79  MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70,
80  MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80,
81  MQTT_FHDR_MSG_TYPE_SUBACK = 0x90,
82  MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0,
83  MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0,
84  MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0,
85  MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0,
86  MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0,
87 
88  MQTT_FHDR_DUP_FLAG = 0x08,
89 
90  MQTT_FHDR_QOS_LEVEL_0 = 0x00,
91  MQTT_FHDR_QOS_LEVEL_1 = 0x02,
92  MQTT_FHDR_QOS_LEVEL_2 = 0x04,
93 
94  MQTT_FHDR_RETAIN_FLAG = 0x01,
95 } mqtt_fhdr_fields_t;
96 /*---------------------------------------------------------------------------*/
97 typedef enum {
98  MQTT_VHDR_USERNAME_FLAG = 0x80,
99  MQTT_VHDR_PASSWORD_FLAG = 0x40,
100 
101  MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
102  MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
103  MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
104  MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
105 
106  MQTT_VHDR_WILL_FLAG = 0x04,
107  MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
108 } mqtt_vhdr_conn_fields_t;
109 /*---------------------------------------------------------------------------*/
110 typedef enum {
111  MQTT_VHDR_CONN_ACCEPTED,
112  MQTT_VHDR_CONN_REJECTED_PROTOCOL,
113  MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
114  MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
115  MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
116  MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
117 } mqtt_vhdr_connack_fields_t;
118 /*---------------------------------------------------------------------------*/
119 #define MQTT_CONNECT_VHDR_FLAGS_SIZE 12
120 
121 #define MQTT_STRING_LEN_SIZE 2
122 #define MQTT_MID_SIZE 2
123 #define MQTT_QOS_SIZE 1
124 /*---------------------------------------------------------------------------*/
125 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10)
126 /*---------------------------------------------------------------------------*/
127 #define INCREMENT_MID(conn) (conn)->mid_counter += 2
128 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length))
129 /*---------------------------------------------------------------------------*/
130 /* Protothread send macros */
131 #define PT_MQTT_WRITE_BYTES(conn, data, len) \
132  conn->out_write_pos = 0; \
133  while(write_bytes(conn, data, len)) { \
134  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
135  }
136 
137 #define PT_MQTT_WRITE_BYTE(conn, data) \
138  while(write_byte(conn, data)) { \
139  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
140  }
141 /*---------------------------------------------------------------------------*/
142 /*
143  * Sends the continue send event and wait for that event.
144  *
145  * The reason we cannot use PROCESS_PAUSE() is since we would risk loosing any
146  * events posted during the sending process.
147  */
148 #define PT_MQTT_WAIT_SEND() \
149  do { \
150  if (PROCESS_ERR_OK == \
151  process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \
152  do { \
153  PROCESS_WAIT_EVENT(); \
154  if(ev == mqtt_abort_now_event) { \
155  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
156  PT_INIT(&conn->out_proto_thread); \
157  process_post(PROCESS_CURRENT(), ev, data); \
158  } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
159  process_post(PROCESS_CURRENT(), ev, data); \
160  } \
161  } while (ev != mqtt_continue_send_event); \
162  } \
163  } while(0)
164 /*---------------------------------------------------------------------------*/
165 static process_event_t mqtt_do_connect_tcp_event;
166 static process_event_t mqtt_do_connect_mqtt_event;
167 static process_event_t mqtt_do_disconnect_mqtt_event;
168 static process_event_t mqtt_do_subscribe_event;
169 static process_event_t mqtt_do_unsubscribe_event;
170 static process_event_t mqtt_do_publish_event;
171 static process_event_t mqtt_do_pingreq_event;
172 static process_event_t mqtt_continue_send_event;
173 static process_event_t mqtt_abort_now_event;
174 process_event_t mqtt_update_event;
175 
176 /*
177  * Min and Max event numbers we want to acknowledge while we're in the process
178  * of doing something else. continue_send does not count, therefore must be
179  * allocated last
180  */
181 static process_event_t mqtt_event_min;
182 static process_event_t mqtt_event_max;
183 /*---------------------------------------------------------------------------*/
184 /* Prototypes */
185 static int
186 tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr,
187  int input_data_len);
188 
189 static void tcp_event(struct tcp_socket *s, void *ptr,
190  tcp_socket_event_t event);
191 
192 static void reset_packet(struct mqtt_in_packet *packet);
193 /*---------------------------------------------------------------------------*/
194 LIST(mqtt_conn_list);
195 /*---------------------------------------------------------------------------*/
196 PROCESS(mqtt_process, "MQTT process");
197 /*---------------------------------------------------------------------------*/
198 static void
199 call_event(struct mqtt_connection *conn,
200  mqtt_event_t event,
201  void *data)
202 {
203  conn->event_callback(conn, event, data);
204  process_post(conn->app_process, mqtt_update_event, NULL);
205 }
206 /*---------------------------------------------------------------------------*/
207 static void
208 reset_defaults(struct mqtt_connection *conn)
209 {
210  conn->mid_counter = 1;
211  PT_INIT(&conn->out_proto_thread);
212  conn->waiting_for_pingresp = 0;
213 
214  reset_packet(&conn->in_packet);
215  conn->out_buffer_sent = 0;
216 }
217 /*---------------------------------------------------------------------------*/
218 static void
219 abort_connection(struct mqtt_connection *conn)
220 {
221  conn->out_buffer_ptr = conn->out_buffer;
222  conn->out_queue_full = 0;
223 
224  /* Reset outgoing packet */
225  memset(&conn->out_packet, 0, sizeof(conn->out_packet));
226 
227  tcp_socket_close(&conn->socket);
228  tcp_socket_unregister(&conn->socket);
229 
230  memset(&conn->socket, 0, sizeof(conn->socket));
231 
232  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
233 }
234 /*---------------------------------------------------------------------------*/
235 static void
236 connect_tcp(struct mqtt_connection *conn)
237 {
238  conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
239 
240  reset_defaults(conn);
241  tcp_socket_register(&(conn->socket),
242  conn,
243  conn->in_buffer,
244  MQTT_TCP_INPUT_BUFF_SIZE,
245  conn->out_buffer,
246  MQTT_TCP_OUTPUT_BUFF_SIZE,
247  tcp_input,
248  tcp_event);
249  tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
250 }
251 /*---------------------------------------------------------------------------*/
252 static void
253 disconnect_tcp(struct mqtt_connection *conn)
254 {
255  conn->state = MQTT_CONN_STATE_DISCONNECTING;
256  tcp_socket_close(&(conn->socket));
257  tcp_socket_unregister(&conn->socket);
258 
259  memset(&conn->socket, 0, sizeof(conn->socket));
260 }
261 /*---------------------------------------------------------------------------*/
262 static void
263 send_out_buffer(struct mqtt_connection *conn)
264 {
265  if(conn->out_buffer_ptr - conn->out_buffer == 0) {
266  conn->out_buffer_sent = 1;
267  return;
268  }
269  conn->out_buffer_sent = 0;
270 
271  DBG("MQTT - (send_out_buffer) Space used in buffer: %i\n",
272  conn->out_buffer_ptr - conn->out_buffer);
273 
274  tcp_socket_send(&conn->socket, conn->out_buffer,
275  conn->out_buffer_ptr - conn->out_buffer);
276 }
277 /*---------------------------------------------------------------------------*/
278 static void
279 string_to_mqtt_string(struct mqtt_string *mqtt_string, char *string)
280 {
281  if(mqtt_string == NULL) {
282  return;
283  }
284  mqtt_string->string = string;
285 
286  if(string != NULL) {
287  mqtt_string->length = strlen(string);
288  } else {
289  mqtt_string->length = 0;
290  }
291 }
292 /*---------------------------------------------------------------------------*/
293 static int
294 write_byte(struct mqtt_connection *conn, uint8_t data)
295 {
296  DBG("MQTT - (write_byte) buff_size: %i write: '%02X'\n",
297  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
298  data);
299 
300  if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
301  send_out_buffer(conn);
302  return 1;
303  }
304 
305  *conn->out_buffer_ptr = data;
306  conn->out_buffer_ptr++;
307  return 0;
308 }
309 /*---------------------------------------------------------------------------*/
310 static int
311 write_bytes(struct mqtt_connection *conn, uint8_t *data, uint16_t len)
312 {
313  uint16_t write_bytes;
314  write_bytes =
315  MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
316  len - conn->out_write_pos);
317 
318  memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
319  conn->out_write_pos += write_bytes;
320  conn->out_buffer_ptr += write_bytes;
321 
322  DBG("MQTT - (write_bytes) len: %u write_pos: %lu\n", len,
323  conn->out_write_pos);
324 
325  if(len - conn->out_write_pos == 0) {
326  conn->out_write_pos = 0;
327  return 0;
328  } else {
329  send_out_buffer(conn);
330  return len - conn->out_write_pos;
331  }
332 }
333 /*---------------------------------------------------------------------------*/
334 static void
335 encode_remaining_length(uint8_t *remaining_length,
336  uint8_t *remaining_length_bytes,
337  uint32_t length)
338 {
339  uint8_t digit;
340 
341  DBG("MQTT - Encoding length %lu\n", length);
342 
343  *remaining_length_bytes = 0;
344  do {
345  digit = length % 128;
346  length = length / 128;
347  if(length > 0) {
348  digit = digit | 0x80;
349  }
350 
351  remaining_length[*remaining_length_bytes] = digit;
352  (*remaining_length_bytes)++;
353  DBG("MQTT - Encode len digit '%u' length '%lu'\n", digit, length);
354  } while(length > 0 && *remaining_length_bytes < 5);
355  DBG("MQTT - remaining_length_bytes %u\n", *remaining_length_bytes);
356 }
357 /*---------------------------------------------------------------------------*/
358 static void
359 keep_alive_callback(void *ptr)
360 {
361  struct mqtt_connection *conn = ptr;
362 
363  DBG("MQTT - (keep_alive_callback) Called!\n");
364 
365  /* The flag is set when the PINGREQ has been sent */
366  if(conn->waiting_for_pingresp) {
367  PRINTF("MQTT - Disconnect due to no PINGRESP from broker.\n");
368  disconnect_tcp(conn);
369  return;
370  }
371 
372  process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
373 }
374 /*---------------------------------------------------------------------------*/
375 static void
376 reset_packet(struct mqtt_in_packet *packet)
377 {
378  memset(packet, 0, sizeof(struct mqtt_in_packet));
379  packet->remaining_multiplier = 1;
380 }
381 /*---------------------------------------------------------------------------*/
382 static
383 PT_THREAD(connect_pt(struct pt *pt, struct mqtt_connection *conn))
384 {
385  PT_BEGIN(pt);
386 
387  DBG("MQTT - Sending CONNECT message...\n");
388 
389  /* Set up FHDR */
390  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
391  conn->out_packet.remaining_length = 0;
392  conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_FLAGS_SIZE;
393  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
394  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
395  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
396  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
397  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
398  encode_remaining_length(conn->out_packet.remaining_length_enc,
399  &conn->out_packet.remaining_length_enc_bytes,
400  conn->out_packet.remaining_length);
401  if(conn->out_packet.remaining_length_enc_bytes > 4) {
402  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
403  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
404  PT_EXIT(pt);
405  }
406 
407  /* Write Fixed Header */
408  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
409  PT_MQTT_WRITE_BYTES(conn,
410  conn->out_packet.remaining_length_enc,
411  conn->out_packet.remaining_length_enc_bytes);
412  PT_MQTT_WRITE_BYTE(conn, 0);
413  PT_MQTT_WRITE_BYTE(conn, 6);
414  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, 6);
415  PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
416  PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
417  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
418  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
419  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length << 8);
420  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
421  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
422  conn->client_id.length);
423  if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
424  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length << 8);
425  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
426  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
427  conn->will.topic.length);
428  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length << 8);
429  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
430  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
431  conn->will.message.length);
432  DBG("MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
433  conn->will.topic.string,
434  conn->will.topic.length,
435  conn->will.message.string,
436  conn->will.message.length);
437  }
438  if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
439  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length << 8);
440  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
441  PT_MQTT_WRITE_BYTES(conn,
442  (uint8_t *)conn->credentials.username.string,
443  conn->credentials.username.length);
444  }
445  if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
446  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length << 8);
447  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
448  PT_MQTT_WRITE_BYTES(conn,
449  (uint8_t *)conn->credentials.password.string,
450  conn->credentials.password.length);
451  }
452 
453  /* Send out buffer */
454  send_out_buffer(conn);
455  conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
456 
457  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
458 
459  /* Wait for CONNACK */
460  reset_packet(&conn->in_packet);
461  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
462  timer_expired(&conn->t));
463  if(timer_expired(&conn->t)) {
464  DBG("Timeout waiting for CONNACK\n");
465  /* We stick to the letter of the spec here: Tear the connection down */
466  mqtt_disconnect(conn);
467  }
468  reset_packet(&conn->in_packet);
469 
470  DBG("MQTT - Done sending CONNECT\n");
471 
472 #if DEBUG_MQTT == 1
473  DBG("MQTT - CONNECT message sent: \n");
474  uint16_t i;
475  for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
476  DBG("%02X ", conn->out_buffer[i]);
477  }
478  DBG("\n");
479 #endif
480 
481  PT_END(pt);
482 }
483 /*---------------------------------------------------------------------------*/
484 static
485 PT_THREAD(disconnect_pt(struct pt *pt, struct mqtt_connection *conn))
486 {
487  PT_BEGIN(pt);
488 
489  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
490  PT_MQTT_WRITE_BYTE(conn, 0);
491 
492  send_out_buffer(conn);
493 
494  /*
495  * Wait a couple of seconds for a TCP ACK. We don't really need the ACK,
496  * we do want the TCP/IP stack to actually send this disconnect before we
497  * tear down the session.
498  */
499  timer_set(&conn->t, (CLOCK_SECOND * 2));
500  PT_WAIT_UNTIL(pt, conn->out_buffer_sent || timer_expired(&conn->t));
501 
502  PT_END(pt);
503 }
504 /*---------------------------------------------------------------------------*/
505 static
506 PT_THREAD(subscribe_pt(struct pt *pt, struct mqtt_connection *conn))
507 {
508  PT_BEGIN(pt);
509 
510  DBG("MQTT - Sending subscribe message! topic %s topic_length %i\n",
511  conn->out_packet.topic,
512  conn->out_packet.topic_length);
513  DBG("MQTT - Buffer space is %i \n",
514  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
515 
516  /* Set up FHDR */
517  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
518  conn->out_packet.remaining_length = MQTT_MID_SIZE +
519  MQTT_STRING_LEN_SIZE +
520  conn->out_packet.topic_length +
521  MQTT_QOS_SIZE;
522  encode_remaining_length(conn->out_packet.remaining_length_enc,
523  &conn->out_packet.remaining_length_enc_bytes,
524  conn->out_packet.remaining_length);
525  if(conn->out_packet.remaining_length_enc_bytes > 4) {
526  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
527  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
528  PT_EXIT(pt);
529  }
530 
531  /* Write Fixed Header */
532  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
533  PT_MQTT_WRITE_BYTES(conn,
534  conn->out_packet.remaining_length_enc,
535  conn->out_packet.remaining_length_enc_bytes);
536  /* Write Variable Header */
537  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
538  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
539  /* Write Payload */
540  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
541  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
542  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
543  conn->out_packet.topic_length);
544  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
545 
546  /* Send out buffer */
547  send_out_buffer(conn);
548  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
549 
550  /* Wait for SUBACK. */
551  reset_packet(&conn->in_packet);
552  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
553  timer_expired(&conn->t));
554 
555  if(timer_expired(&conn->t)) {
556  DBG("Timeout waiting for SUBACK\n");
557  }
558  reset_packet(&conn->in_packet);
559 
560  /* This is clear after the entire transaction is complete */
561  conn->out_queue_full = 0;
562 
563  DBG("MQTT - Done in send_subscribe!\n");
564 
565  PT_END(pt);
566 }
567 /*---------------------------------------------------------------------------*/
568 static
569 PT_THREAD(unsubscribe_pt(struct pt *pt, struct mqtt_connection *conn))
570 {
571  PT_BEGIN(pt);
572 
573  DBG("MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
574  conn->out_packet.topic,
575  conn->out_packet.topic_length);
576  DBG("MQTT - Buffer space is %i \n",
577  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
578 
579  /* Set up FHDR */
580  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
581  MQTT_FHDR_QOS_LEVEL_1;
582  conn->out_packet.remaining_length = MQTT_MID_SIZE +
583  MQTT_STRING_LEN_SIZE +
584  conn->out_packet.topic_length;
585  encode_remaining_length(conn->out_packet.remaining_length_enc,
586  &conn->out_packet.remaining_length_enc_bytes,
587  conn->out_packet.remaining_length);
588  if(conn->out_packet.remaining_length_enc_bytes > 4) {
589  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
590  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
591  PT_EXIT(pt);
592  }
593 
594  /* Write Fixed Header */
595  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
596  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
597  conn->out_packet.remaining_length_enc_bytes);
598  /* Write Variable Header */
599  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
600  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
601  /* Write Payload */
602  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
603  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
604  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
605  conn->out_packet.topic_length);
606 
607  /* Send out buffer */
608  send_out_buffer(conn);
609  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
610 
611  /* Wait for UNSUBACK */
612  reset_packet(&conn->in_packet);
613  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
614  timer_expired(&conn->t));
615 
616  if(timer_expired(&conn->t)) {
617  DBG("Timeout waiting for UNSUBACK\n");
618  }
619 
620  reset_packet(&conn->in_packet);
621 
622  /* This is clear after the entire transaction is complete */
623  conn->out_queue_full = 0;
624 
625  DBG("MQTT - Done writing subscribe message to out buffer!\n");
626 
627  PT_END(pt);
628 }
629 /*---------------------------------------------------------------------------*/
630 static
631 PT_THREAD(publish_pt(struct pt *pt, struct mqtt_connection *conn))
632 {
633  PT_BEGIN(pt);
634 
635  DBG("MQTT - Sending publish message! topic %s topic_length %i\n",
636  conn->out_packet.topic,
637  conn->out_packet.topic_length);
638  DBG("MQTT - Buffer space is %i \n",
639  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
640 
641  /* Set up FHDR */
642  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
643  conn->out_packet.qos << 1;
644  if(conn->out_packet.retain == MQTT_RETAIN_ON) {
645  conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
646  }
647  conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
648  conn->out_packet.topic_length +
649  conn->out_packet.payload_size;
650  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
651  conn->out_packet.remaining_length += MQTT_MID_SIZE;
652  }
653  encode_remaining_length(conn->out_packet.remaining_length_enc,
654  &conn->out_packet.remaining_length_enc_bytes,
655  conn->out_packet.remaining_length);
656  if(conn->out_packet.remaining_length_enc_bytes > 4) {
657  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
658  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
659  PT_EXIT(pt);
660  }
661 
662  /* Write Fixed Header */
663  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
664  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
665  conn->out_packet.remaining_length_enc_bytes);
666  /* Write Variable Header */
667  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
668  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
669  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
670  conn->out_packet.topic_length);
671  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
672  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
673  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
674  }
675  /* Write Payload */
676  PT_MQTT_WRITE_BYTES(conn,
677  conn->out_packet.payload,
678  conn->out_packet.payload_size);
679 
680  send_out_buffer(conn);
681  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
682 
683  /*
684  * If QoS is zero then wait until the message has been sent, since there is
685  * no ACK to wait for.
686  *
687  * Also notify the app will not be notified via PUBACK or PUBCOMP
688  */
689  if(conn->out_packet.qos == 0) {
690  process_post(conn->app_process, mqtt_update_event, NULL);
691  } else if(conn->out_packet.qos == 1) {
692  /* Wait for PUBACK */
693  reset_packet(&conn->in_packet);
694  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
695  timer_expired(&conn->t));
696  if(timer_expired(&conn->t)) {
697  DBG("Timeout waiting for PUBACK\n");
698  }
699  if(conn->in_packet.mid != conn->out_packet.mid) {
700  DBG("MQTT - Warning, got PUBACK with none matching MID. Currently there "
701  "is no support for several concurrent PUBLISH messages.\n");
702  }
703  } else if(conn->out_packet.qos == 2) {
704  DBG("MQTT - QoS not implemented yet.\n");
705  /* Should wait for PUBREC, send PUBREL and then wait for PUBCOMP */
706  }
707 
708  reset_packet(&conn->in_packet);
709 
710  /* This is clear after the entire transaction is complete */
711  conn->out_queue_full = 0;
712 
713  DBG("MQTT - Publish Enqueued\n");
714 
715  PT_END(pt);
716 }
717 /*---------------------------------------------------------------------------*/
718 static
719 PT_THREAD(pingreq_pt(struct pt *pt, struct mqtt_connection *conn))
720 {
721  PT_BEGIN(pt);
722 
723  DBG("MQTT - Sending PINGREQ\n");
724 
725  /* Write Fixed Header */
726  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
727  PT_MQTT_WRITE_BYTE(conn, 0);
728 
729  send_out_buffer(conn);
730 
731  /* Start timeout for reply. */
732  conn->waiting_for_pingresp = 1;
733 
734  /* Wait for PINGRESP or timeout */
735  reset_packet(&conn->in_packet);
736  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
737 
738  PT_WAIT_UNTIL(pt, conn->in_packet.packet_received || timer_expired(&conn->t));
739 
740  reset_packet(&conn->in_packet);
741 
742  conn->waiting_for_pingresp = 0;
743 
744  PT_END(pt);
745 }
746 /*---------------------------------------------------------------------------*/
747 static void
748 handle_connack(struct mqtt_connection *conn)
749 {
750  DBG("MQTT - Got CONNACK\n");
751 
752  if(conn->in_packet.payload[1] != 0) {
753  PRINTF("MQTT - Connection refused with Return Code %i\n",
754  conn->in_packet.payload[1]);
755  call_event(conn,
756  MQTT_EVENT_CONNECTION_REFUSED_ERROR,
757  &conn->in_packet.payload[1]);
758  abort_connection(conn);
759  return;
760  }
761 
762  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
763 
764  ctimer_set(&conn->keep_alive_timer, conn->keep_alive * CLOCK_SECOND,
765  keep_alive_callback, conn);
766 
767  /* Always reset packet before callback since it might be used directly */
768  conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
769  call_event(conn, MQTT_EVENT_CONNECTED, NULL);
770 }
771 /*---------------------------------------------------------------------------*/
772 static void
773 handle_pingresp(struct mqtt_connection *conn)
774 {
775  DBG("MQTT - Got RINGRESP\n");
776 }
777 /*---------------------------------------------------------------------------*/
778 static void
779 handle_suback(struct mqtt_connection *conn)
780 {
781  struct mqtt_suback_event suback_event;
782 
783  DBG("MQTT - Got SUBACK\n");
784 
785  /* Only accept SUBACKS with X topic QoS response, assume 1 */
786  if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
787  MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
788  DBG("MQTT - Error, SUBACK with > 1 topic, not supported.\n");
789  }
790 
791  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
792 
793  suback_event.mid = (conn->in_packet.payload[0] << 8) |
794  (conn->in_packet.payload[1]);
795  suback_event.qos_level = conn->in_packet.payload[2];
796  conn->in_packet.mid = suback_event.mid;
797 
798  if(conn->in_packet.mid != conn->out_packet.mid) {
799  DBG("MQTT - Warning, got SUBACK with none matching MID. Currently there is"
800  "no support for several concurrent SUBSCRIBE messages.\n");
801  }
802 
803  /* Always reset packet before callback since it might be used directly */
804  call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
805 }
806 /*---------------------------------------------------------------------------*/
807 static void
808 handle_unsuback(struct mqtt_connection *conn)
809 {
810  DBG("MQTT - Got UNSUBACK\n");
811 
812  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
813  conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
814  (conn->in_packet.payload[1]);
815 
816  if(conn->in_packet.mid != conn->out_packet.mid) {
817  DBG("MQTT - Warning, got UNSUBACK with none matching MID. Currently there is"
818  "no support for several concurrent UNSUBSCRIBE messages.\n");
819  }
820 
821  call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
822 }
823 /*---------------------------------------------------------------------------*/
824 static void
825 handle_puback(struct mqtt_connection *conn)
826 {
827  DBG("MQTT - Got PUBACK\n");
828 
829  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
830  conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
831  (conn->in_packet.payload[1]);
832 
833  call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
834 }
835 /*---------------------------------------------------------------------------*/
836 static void
837 handle_publish(struct mqtt_connection *conn)
838 {
839  DBG("MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
840  DBG("MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
841 
842  DBG("MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos);
843 
844  if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
845  PRINTF("MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
846  }
847 
848  call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
849 
850  if(conn->in_publish_msg.first_chunk == 1) {
851  conn->in_publish_msg.first_chunk = 0;
852  }
853 
854  /* If this is the last time handle_publish will be called, reset packet. */
855  if(conn->in_publish_msg.payload_left == 0) {
856 
857  /* Check for QoS and initiate the reply, do not rely on the data in the
858  * in_packet being untouched. */
859 
860  DBG("MQTT - (handle_publish) resetting packet.\n");
861  reset_packet(&conn->in_packet);
862  }
863 }
864 /*---------------------------------------------------------------------------*/
865 static void
866 parse_publish_vhdr(struct mqtt_connection *conn,
867  uint32_t *pos,
868  const uint8_t *input_data_ptr,
869  int input_data_len)
870 {
871  uint16_t copy_bytes;
872 
873  /* Read out topic length */
874  if(conn->in_packet.topic_len_received == 0) {
875  conn->in_packet.topic_pos = 0;
876  conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
877  conn->in_packet.byte_counter++;
878  if(*pos >= input_data_len) {
879  return;
880  }
881  conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
882  conn->in_packet.byte_counter++;
883  conn->in_packet.topic_len_received = 1;
884  /* Abort if topic is longer than our topic buffer */
885  if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) {
886  DBG("MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH);
887  return;
888  }
889  DBG("MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
890  /* WARNING: Check here if TOPIC fits in payload area, otherwise error */
891  }
892 
893  /* Read out topic */
894  if(conn->in_packet.topic_len_received == 1 &&
895  conn->in_packet.topic_received == 0) {
896  copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
897  input_data_len - *pos);
898  DBG("MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos,
899  copy_bytes);
900  memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
901  &input_data_ptr[*pos],
902  copy_bytes);
903  (*pos) += copy_bytes;
904  conn->in_packet.byte_counter += copy_bytes;
905  conn->in_packet.topic_pos += copy_bytes;
906 
907  if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
908  DBG("MQTT - Got topic '%s'", conn->in_publish_msg.topic);
909  conn->in_packet.topic_received = 1;
910  conn->in_publish_msg.topic[conn->in_packet.topic_pos] = '\0';
911  conn->in_publish_msg.payload_length =
912  conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
913  conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
914  }
915 
916  /* Set this once per incomming publish message */
917  conn->in_publish_msg.first_chunk = 1;
918  }
919 }
920 /*---------------------------------------------------------------------------*/
921 static int
922 tcp_input(struct tcp_socket *s,
923  void *ptr,
924  const uint8_t *input_data_ptr,
925  int input_data_len)
926 {
927  struct mqtt_connection *conn = ptr;
928  uint32_t pos = 0;
929  uint32_t copy_bytes = 0;
930  uint8_t byte;
931 
932  if(input_data_len == 0) {
933  return 0;
934  }
935 
936  if(conn->in_packet.packet_received) {
937  reset_packet(&conn->in_packet);
938  }
939 
940  DBG("tcp_input with %i bytes of data:\n", input_data_len);
941 
942  /* Read the fixed header field, if we do not have it */
943  if(!conn->in_packet.fhdr) {
944  conn->in_packet.fhdr = input_data_ptr[pos++];
945  conn->in_packet.byte_counter++;
946 
947  DBG("MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
948 
949  if(pos >= input_data_len) {
950  return 0;
951  }
952  }
953 
954  /* Read the Remaining Length field, if we do not have it */
955  if(!conn->in_packet.has_remaining_length) {
956  do {
957  if(pos >= input_data_len) {
958  return 0;
959  }
960 
961  byte = input_data_ptr[pos++];
962  conn->in_packet.byte_counter++;
963  conn->in_packet.remaining_length_bytes++;
964  DBG("MQTT - Read Remaining Length byte\n");
965 
966  if(conn->in_packet.byte_counter > 5) {
967  call_event(conn, MQTT_EVENT_ERROR, NULL);
968  DBG("Received more then 4 byte 'remaining lenght'.");
969  return 0;
970  }
971 
972  conn->in_packet.remaining_length +=
973  (byte & 127) * conn->in_packet.remaining_multiplier;
974  conn->in_packet.remaining_multiplier *= 128;
975  } while((byte & 128) != 0);
976 
977  DBG("MQTT - Finished reading remaining length byte\n");
978  conn->in_packet.has_remaining_length = 1;
979  }
980 
981  /*
982  * Check for unsupported payload length. Will read all incoming data from the
983  * server in any case and then reset the packet.
984  *
985  * TODO: Decide if we, for example, want to disconnect instead.
986  */
987  if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
988  (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
989 
990  PRINTF("MQTT - Error, unsupported payload size for non-PUBLISH message\n");
991 
992  conn->in_packet.byte_counter += input_data_len;
993  if(conn->in_packet.byte_counter >=
994  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
995  conn->in_packet.packet_received = 1;
996  }
997  return 0;
998  }
999 
1000  /*
1001  * Supported payload, reads out both VHDR and Payload of all packets.
1002  *
1003  * Note: There will always be at least one byte left to read when we enter
1004  * this loop.
1005  */
1006  while(conn->in_packet.byte_counter <
1007  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
1008 
1009  if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
1010  conn->in_packet.topic_received == 0) {
1011  parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1012  }
1013 
1014  /* Read in as much as we can into the packet payload */
1015  copy_bytes = MIN(input_data_len - pos,
1016  MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1017  DBG("- Copied %lu payload bytes\n", copy_bytes);
1018  memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1019  &input_data_ptr[pos],
1020  copy_bytes);
1021  conn->in_packet.byte_counter += copy_bytes;
1022  conn->in_packet.payload_pos += copy_bytes;
1023  pos += copy_bytes;
1024 
1025  uint8_t i;
1026  DBG("MQTT - Copied bytes: \n");
1027  for(i = 0; i < copy_bytes; i++) {
1028  DBG("%02X ", conn->in_packet.payload[i]);
1029  }
1030  DBG("\n");
1031 
1032  /* Full buffer, shall only happen to PUBLISH messages. */
1033  if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1034  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1035  conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1036  conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1037 
1038  handle_publish(conn);
1039 
1040  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1041  conn->in_packet.payload_pos = 0;
1042  }
1043 
1044  if(pos >= input_data_len &&
1045  (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1046  return 0;
1047  }
1048  }
1049 
1050  /* Debug information */
1051  DBG("\n");
1052  /* Take care of input */
1053  DBG("MQTT - Finished reading packet!\n");
1054  /* What to return? */
1055  DBG("MQTT - total data was %i bytes of data. \n",
1056  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1057 
1058  /* Handle packet here. */
1059  switch(conn->in_packet.fhdr & 0xF0) {
1060  case MQTT_FHDR_MSG_TYPE_CONNACK:
1061  handle_connack(conn);
1062  break;
1063  case MQTT_FHDR_MSG_TYPE_PUBLISH:
1064  /* This is the only or the last chunk of publish payload */
1065  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1066  conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1067  conn->in_publish_msg.payload_left = 0;
1068  handle_publish(conn);
1069  break;
1070  case MQTT_FHDR_MSG_TYPE_PUBACK:
1071  handle_puback(conn);
1072  break;
1073  case MQTT_FHDR_MSG_TYPE_SUBACK:
1074  handle_suback(conn);
1075  break;
1076  case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1077  handle_unsuback(conn);
1078  break;
1079  case MQTT_FHDR_MSG_TYPE_PINGRESP:
1080  handle_pingresp(conn);
1081  break;
1082 
1083  /* QoS 2 not implemented yet */
1084  case MQTT_FHDR_MSG_TYPE_PUBREC:
1085  case MQTT_FHDR_MSG_TYPE_PUBREL:
1086  case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1087  call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1088  PRINTF("MQTT - Got unhandled MQTT Message Type '%i'",
1089  (conn->in_packet.fhdr & 0xF0));
1090  break;
1091 
1092  default:
1093  /* All server-only message */
1094  PRINTF("MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1095  break;
1096  }
1097 
1098  conn->in_packet.packet_received = 1;
1099 
1100  return 0;
1101 }
1102 /*---------------------------------------------------------------------------*/
1103 /*
1104  * Handles TCP events from Simple TCP
1105  */
1106 static void
1107 tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event)
1108 {
1109  struct mqtt_connection *conn = ptr;
1110 
1111  /* Take care of event */
1112  switch(event) {
1113 
1114  /* Fall through to manage different disconnect event the same way. */
1115  case TCP_SOCKET_CLOSED:
1116  case TCP_SOCKET_TIMEDOUT:
1117  case TCP_SOCKET_ABORTED: {
1118 
1119  DBG("MQTT - Disconnected by tcp event %d\n", event);
1120  process_post(&mqtt_process, mqtt_abort_now_event, conn);
1121  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1122  ctimer_stop(&conn->keep_alive_timer);
1123  call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1124  abort_connection(conn);
1125 
1126  /* If connecting retry */
1127  if(conn->auto_reconnect == 1) {
1128  connect_tcp(conn);
1129  }
1130  break;
1131  }
1132  case TCP_SOCKET_CONNECTED: {
1133  conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1134  conn->out_buffer_sent = 1;
1135 
1136  process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1137  break;
1138  }
1139  case TCP_SOCKET_DATA_SENT: {
1140  DBG("MQTT - Got TCP_DATA_SENT\n");
1141 
1142  if(conn->socket.output_data_len == 0) {
1143  conn->out_buffer_sent = 1;
1144  conn->out_buffer_ptr = conn->out_buffer;
1145  }
1146 
1147  ctimer_restart(&conn->keep_alive_timer);
1148  break;
1149  }
1150 
1151  default: {
1152  DBG("MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1153  event);
1154  }
1155  }
1156 }
1157 /*---------------------------------------------------------------------------*/
1158 PROCESS_THREAD(mqtt_process, ev, data)
1159 {
1160  static struct mqtt_connection *conn;
1161 
1162  PROCESS_BEGIN();
1163 
1164  while(1) {
1166 
1167  if(ev == mqtt_abort_now_event) {
1168  DBG("MQTT - Abort\n");
1169  conn = data;
1170  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1171 
1172  abort_connection(conn);
1173  }
1174  if(ev == mqtt_do_connect_tcp_event) {
1175  conn = data;
1176  DBG("MQTT - Got mqtt_do_connect_tcp_event!\n");
1177  connect_tcp(conn);
1178  }
1179  if(ev == mqtt_do_connect_mqtt_event) {
1180  conn = data;
1181  conn->socket.output_data_max_seg = conn->max_segment_size;
1182  DBG("MQTT - Got mqtt_do_connect_mqtt_event!\n");
1183 
1184  if(conn->out_buffer_sent == 1) {
1185  PT_INIT(&conn->out_proto_thread);
1186  while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1187  conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1188  PT_MQTT_WAIT_SEND();
1189  }
1190  }
1191  }
1192  if(ev == mqtt_do_disconnect_mqtt_event) {
1193  conn = data;
1194  DBG("MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1195 
1196  /* Send MQTT Disconnect if we are connected */
1197  if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1198  if(conn->out_buffer_sent == 1) {
1199  PT_INIT(&conn->out_proto_thread);
1200  while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
1201  disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1202  PT_MQTT_WAIT_SEND();
1203  }
1204  abort_connection(conn);
1205  call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1206  } else {
1207  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1208  }
1209  }
1210  }
1211  if(ev == mqtt_do_pingreq_event) {
1212  conn = data;
1213  DBG("MQTT - Got mqtt_do_pingreq_event!\n");
1214 
1215  if(conn->out_buffer_sent == 1 &&
1216  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1217  PT_INIT(&conn->out_proto_thread);
1218  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1219  pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1220  PT_MQTT_WAIT_SEND();
1221  }
1222  }
1223  }
1224  if(ev == mqtt_do_subscribe_event) {
1225  conn = data;
1226  DBG("MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1227 
1228  if(conn->out_buffer_sent == 1 &&
1229  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1230  PT_INIT(&conn->out_proto_thread);
1231  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1232  subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1233  PT_MQTT_WAIT_SEND();
1234  }
1235  }
1236  }
1237  if(ev == mqtt_do_unsubscribe_event) {
1238  conn = data;
1239  DBG("MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1240 
1241  if(conn->out_buffer_sent == 1 &&
1242  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1243  PT_INIT(&conn->out_proto_thread);
1244  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1245  unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1246  PT_MQTT_WAIT_SEND();
1247  }
1248  }
1249  }
1250  if(ev == mqtt_do_publish_event) {
1251  conn = data;
1252  DBG("MQTT - Got mqtt_do_publish_mqtt_event!\n");
1253 
1254  if(conn->out_buffer_sent == 1 &&
1255  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1256  PT_INIT(&conn->out_proto_thread);
1257  while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
1258  publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
1259  PT_MQTT_WAIT_SEND();
1260  }
1261  }
1262  }
1263  }
1264  PROCESS_END();
1265 }
1266 /*---------------------------------------------------------------------------*/
1267 void
1268 mqtt_init(void)
1269 {
1270  static uint8_t inited = 0;
1271  if(!inited) {
1272  mqtt_do_connect_tcp_event = process_alloc_event();
1273  mqtt_event_min = mqtt_do_connect_tcp_event;
1274 
1275  mqtt_do_connect_mqtt_event = process_alloc_event();
1276  mqtt_do_disconnect_mqtt_event = process_alloc_event();
1277  mqtt_do_subscribe_event = process_alloc_event();
1278  mqtt_do_unsubscribe_event = process_alloc_event();
1279  mqtt_do_publish_event = process_alloc_event();
1280  mqtt_do_pingreq_event = process_alloc_event();
1281  mqtt_update_event = process_alloc_event();
1282  mqtt_abort_now_event = process_alloc_event();
1283  mqtt_event_max = mqtt_abort_now_event;
1284 
1285  mqtt_continue_send_event = process_alloc_event();
1286 
1287  list_init(mqtt_conn_list);
1288  process_start(&mqtt_process, NULL);
1289  inited = 1;
1290  }
1291 }
1292 /*---------------------------------------------------------------------------*/
1293 mqtt_status_t
1294 mqtt_register(struct mqtt_connection *conn, struct process *app_process,
1295  char *client_id, mqtt_event_callback_t event_callback,
1296  uint16_t max_segment_size)
1297 {
1298  if(strlen(client_id) < 1) {
1299  return MQTT_STATUS_INVALID_ARGS_ERROR;
1300  }
1301 
1302  /* Set defaults - Set all to zero to begin with */
1303  memset(conn, 0, sizeof(struct mqtt_connection));
1304  string_to_mqtt_string(&conn->client_id, client_id);
1305  conn->event_callback = event_callback;
1306  conn->app_process = app_process;
1307  conn->auto_reconnect = 1;
1308  conn->max_segment_size = max_segment_size;
1309  reset_defaults(conn);
1310 
1311  mqtt_init();
1312  list_add(mqtt_conn_list, conn);
1313 
1314  DBG("MQTT - Registered successfully\n");
1315 
1316  return MQTT_STATUS_OK;
1317 }
1318 /*---------------------------------------------------------------------------*/
1319 /*
1320  * Connect to MQTT broker.
1321  *
1322  * N.B. Non-blocking call.
1323  */
1324 mqtt_status_t
1325 mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port,
1326  uint16_t keep_alive)
1327 {
1328  uip_ip6addr_t ip6addr;
1329  uip_ipaddr_t *ipaddr;
1330  ipaddr = &ip6addr;
1331 
1332  /* Check if we are already trying to connect */
1333  if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1334  return MQTT_STATUS_OK;
1335  }
1336 
1337  conn->server_host = host;
1338  conn->keep_alive = keep_alive;
1339  conn->server_port = port;
1340  conn->out_buffer_ptr = conn->out_buffer;
1341  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1342  conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1343 
1344  /* convert the string IPv6 address to a numeric IPv6 address */
1345  if(uiplib_ip6addrconv(host, &ip6addr) == 0) {
1346  return MQTT_STATUS_ERROR;
1347  }
1348 
1349  uip_ipaddr_copy(&(conn->server_ip), ipaddr);
1350 
1351  /*
1352  * Initiate the connection if the IP could be resolved. Otherwise the
1353  * connection will be initiated when the DNS lookup is finished, in the main
1354  * event loop.
1355  */
1356  process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1357 
1358  return MQTT_STATUS_OK;
1359 }
1360 /*----------------------------------------------------------------------------*/
1361 void
1362 mqtt_disconnect(struct mqtt_connection *conn)
1363 {
1364  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1365  return;
1366  }
1367 
1368  conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1369 
1370  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1371 }
1372 /*----------------------------------------------------------------------------*/
1373 mqtt_status_t
1374 mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1375  mqtt_qos_level_t qos_level)
1376 {
1377  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1378  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1379  }
1380 
1381  DBG("MQTT - Call to mqtt_subscribe...\n");
1382 
1383  /* Currently don't have a queue, so only one item at a time */
1384  if(conn->out_queue_full) {
1385  DBG("MQTT - Not accepted!\n");
1386  return MQTT_STATUS_OUT_QUEUE_FULL;
1387  }
1388  conn->out_queue_full = 1;
1389  DBG("MQTT - Accepted!\n");
1390 
1391  conn->out_packet.mid = INCREMENT_MID(conn);
1392  conn->out_packet.topic = topic;
1393  conn->out_packet.topic_length = strlen(topic);
1394  conn->out_packet.qos = qos_level;
1395  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1396 
1397  process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1398  return MQTT_STATUS_OK;
1399 }
1400 /*----------------------------------------------------------------------------*/
1401 mqtt_status_t
1402 mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
1403 {
1404  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1405  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1406  }
1407 
1408  DBG("MQTT - Call to mqtt_unsubscribe...\n");
1409  /* Currently don't have a queue, so only one item at a time */
1410  if(conn->out_queue_full) {
1411  DBG("MQTT - Not accepted!\n");
1412  return MQTT_STATUS_OUT_QUEUE_FULL;
1413  }
1414  conn->out_queue_full = 1;
1415  DBG("MQTT - Accepted!\n");
1416 
1417  conn->out_packet.mid = INCREMENT_MID(conn);
1418  conn->out_packet.topic = topic;
1419  conn->out_packet.topic_length = strlen(topic);
1420  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1421 
1422  process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1423  return MQTT_STATUS_OK;
1424 }
1425 /*----------------------------------------------------------------------------*/
1426 mqtt_status_t
1427 mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1428  uint8_t *payload, uint32_t payload_size,
1429  mqtt_qos_level_t qos_level, mqtt_retain_t retain)
1430 {
1431  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1432  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1433  }
1434 
1435  DBG("MQTT - Call to mqtt_publish...\n");
1436 
1437  /* Currently don't have a queue, so only one item at a time */
1438  if(conn->out_queue_full) {
1439  DBG("MQTT - Not accepted!\n");
1440  return MQTT_STATUS_OUT_QUEUE_FULL;
1441  }
1442  conn->out_queue_full = 1;
1443  DBG("MQTT - Accepted!\n");
1444 
1445  conn->out_packet.mid = INCREMENT_MID(conn);
1446  conn->out_packet.retain = retain;
1447  conn->out_packet.topic = topic;
1448  conn->out_packet.topic_length = strlen(topic);
1449  conn->out_packet.payload = payload;
1450  conn->out_packet.payload_size = payload_size;
1451  conn->out_packet.qos = qos_level;
1452  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1453 
1454  process_post(&mqtt_process, mqtt_do_publish_event, conn);
1455  return MQTT_STATUS_OK;
1456 }
1457 /*----------------------------------------------------------------------------*/
1458 void
1459 mqtt_set_username_password(struct mqtt_connection *conn, char *username,
1460  char *password)
1461 {
1462  /* Set strings, NULL string will simply set length to zero */
1463  string_to_mqtt_string(&conn->credentials.username, username);
1464  string_to_mqtt_string(&conn->credentials.password, password);
1465 
1466  /* Set CONNECT VHDR flags */
1467  if(username != NULL) {
1468  conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
1469  } else {
1470  conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
1471  }
1472  if(password != NULL) {
1473  conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
1474  } else {
1475  conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
1476  }
1477 }
1478 /*----------------------------------------------------------------------------*/
1479 void
1480 mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message,
1481  mqtt_qos_level_t qos)
1482 {
1483  /* Set strings, NULL string will simply set length to zero */
1484  string_to_mqtt_string(&conn->will.topic, topic);
1485  string_to_mqtt_string(&conn->will.message, message);
1486 
1487  /* Currently not used! */
1488  conn->will.qos = qos;
1489 
1490  if(topic != NULL) {
1491  conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
1492  MQTT_VHDR_WILL_RETAIN_FLAG;
1493  }
1494 }
1495 /*----------------------------------------------------------------------------*/
1496 /** @} */
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level)
Subscribes to a MQTT topic.
Definition: mqtt.c:1374
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
Definition: uip-nd6.c:125
mqtt_event_t
MQTT engine events.
Definition: mqtt.h:145
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
Definition: timer.c:64
#define PROCESS(name, strname)
Declare a process.
Definition: process.h:307
Protothreads implementation.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
Definition: ctimer.c:149
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
Definition: process.h:141
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive)
Connects to a MQTT broker.
Definition: mqtt.c:1325
#define PROCESS_BEGIN()
Define the beginning of a process.
Definition: process.h:120
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain)
Publish to a MQTT topic.
Definition: mqtt.c:1427
#define PROCESS_END()
Define the end of a process.
Definition: process.h:131
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
Definition: pt.h:114
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
Definition: pt.h:147
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
Definition: mqtt.h:296
Header file for IPv6-related data structures.
#define PT_INIT(pt)
Initialize a protothread.
Definition: pt.h:79
#define CLOCK_SECOND
A second, measured in system clock time.
Definition: clock.h:82
Header file for the callback timer
#define PT_END(pt)
Declare the end of a protothread.
Definition: pt.h:126
Event timer header file.
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
Unsubscribes from a MQTT topic.
Definition: mqtt.c:1402
Linked list manipulation routines.
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
Definition: mqtt.c:1459
Header file for the Contiki MQTT engine.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
Definition: ctimer.c:99
int timer_expired(struct timer *t)
Check if a timer has expired.
Definition: timer.c:123
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
Definition: uip.h:1018
#define PT_EXIT(pt)
Exit the protothread.
Definition: pt.h:245
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
Definition: ctimer.c:137
#define PT_THREAD(name_args)
Declaration of a protothread.
Definition: pt.h:99
process_event_t process_alloc_event(void)
Allocate a global event number.
Definition: process.c:93
void list_add(list_t list, void *item)
Add an item at the end of a list.
Definition: list.c:142
void list_init(list_t list)
Initialize a list.
Definition: list.c:65
Header file for the uIP TCP/IP stack.
#define LIST(name)
Declare a linked list.
Definition: list.h:88
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
Definition: mqtt.c:1294
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
Definition: process.c:322
void mqtt_disconnect(struct mqtt_connection *conn)
Disconnects from a MQTT broker.
Definition: mqtt.c:1362
Default definitions of C compiler quirk work-arounds.
PROCESS_THREAD(cc2538_rf_process, ev, data)
Implementation of the cc2538 RF driver process.
Definition: cc2538-rf.c:1035
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos)
Set the last will topic and message for a MQTT client.
Definition: mqtt.c:1480
Header file for the LED HAL.
void process_start(struct process *p, process_data_t data)
Start a process.
Definition: process.c:99