mqtt: improve the state machine

To handle PUBLISH before SUBACK and more.

Updated the existing tests and added three new ones.

Reported-by: Christoph Krey
Bug: https://curl.haxx.se/mail/lib-2020-04/0021.html
Closes #5246
diff --git a/lib/mqtt.c b/lib/mqtt.c
index 3e24469..35c1b3e 100644
--- a/lib/mqtt.c
+++ b/lib/mqtt.c
@@ -51,8 +51,8 @@
 #define MQTT_MSG_SUBACK    0x90
 #define MQTT_MSG_DISCONNECT 0xe0
 
-#define MQTT_CONNACK_LEN 4
-#define MQTT_SUBACK_LEN 5
+#define MQTT_CONNACK_LEN 2
+#define MQTT_SUBACK_LEN 3
 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
 
 /*
@@ -194,13 +194,9 @@
   }
 
   /* verify CONNACK */
-  if(readbuf[0] != MQTT_MSG_CONNACK ||
-     readbuf[1] != 0x02 ||
-     readbuf[2] != 0x00 ||
-     readbuf[3] != 0x00) {
-    failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x",
-          MQTT_MSG_CONNACK, 0x02, 0x00, 0x00,
-          readbuf[0], readbuf[1], readbuf[2], readbuf[3]);
+  if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
+    failf(data, "Expected %02x%02x but got %02x%02x",
+          0x00, 0x00, readbuf[0], readbuf[1]);
     result = CURLE_WEIRD_SERVER_REPLY;
   }
 
@@ -285,6 +281,9 @@
   return result;
 }
 
+/*
+ * Called when the first byte was already read.
+ */
 static CURLcode mqtt_verify_suback(struct connectdata *conn)
 {
   CURLcode result;
@@ -307,11 +306,9 @@
   }
 
   /* verify SUBACK */
-  if(readbuf[0] != MQTT_MSG_SUBACK ||
-     readbuf[1] != 0x03 ||
-     readbuf[2] != ((mqtt->packetid >> 8) & 0xff) ||
-     readbuf[3] != (mqtt->packetid & 0xff) ||
-     readbuf[4] != 0x00)
+  if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
+     readbuf[1] != (mqtt->packetid & 0xff) ||
+     readbuf[2] != 0x00)
     result = CURLE_WEIRD_SERVER_REPLY;
 
 fail:
@@ -377,67 +374,97 @@
     mult *= 128;
   }
 
-  *lenbytes = i;
+  if(lenbytes)
+    *lenbytes = i;
 
   return len;
 }
 
+#ifdef CURLDEBUG
+static const char *statenames[]={
+  "MQTT_FIRST",
+  "MQTT_REMAINING_LENGTH",
+  "MQTT_CONNACK",
+  "MQTT_SUBACK",
+  "MQTT_SUBACK_COMING",
+  "MQTT_PUBWAIT",
+  "MQTT_PUB_REMAIN"
+};
+#endif
+
+/* The only way to change state */
+static void mqstate(struct connectdata *conn,
+                    enum mqttstate state,
+                    enum mqttstate nextstate) /* used if state == FIRST */
+{
+  struct mqtt_conn *mqtt = &conn->proto.mqtt;
+#ifdef CURLDEBUG
+  infof(conn->data, "%s (from %s) (next is %s)\n",
+        statenames[state],
+        statenames[mqtt->state],
+        (state == MQTT_FIRST)? statenames[nextstate] : "");
+#endif
+  mqtt->state = state;
+  if(state == MQTT_FIRST)
+    mqtt->nextstate = nextstate;
+}
+
+
 /* for the publish packet */
 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
 
 static CURLcode mqtt_read_publish(struct connectdata *conn,
                                   bool *done)
 {
-  CURLcode result;
+  CURLcode result = CURLE_OK;
   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
   ssize_t nread;
   struct Curl_easy *data = conn->data;
   unsigned char *pkt = (unsigned char *)data->state.buffer;
-  size_t remlen, lenbytes;
+  size_t remlen;
   struct mqtt_conn *mqtt = &conn->proto.mqtt;
   struct MQTT *mq = data->req.protop;
+  unsigned char packet;
 
   switch(mqtt->state) {
-  case MQTT_SUBWAIT:
-    /* Read the initial byte and the entire Remaining Length field
-       in this state */
-    result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread);
+  MQTT_SUBACK_COMING:
+  case MQTT_SUBACK_COMING:
+    result = mqtt_verify_suback(conn);
     if(result)
+      break;
+
+    mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
+    break;
+
+  case MQTT_SUBACK:
+  case MQTT_PUBWAIT:
+    /* we are expecting PUBLISH or SUBACK */
+    packet = mq->firstbyte & 0xf0;
+    if(packet == MQTT_MSG_PUBLISH)
+      mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
+    else if(packet == MQTT_MSG_SUBACK) {
+      mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
+      goto MQTT_SUBACK_COMING;
+    }
+    else if(packet == MQTT_MSG_DISCONNECT) {
+      infof(data, "Got DISCONNECT\n");
+      *done = TRUE;
       goto end;
-    if(data->set.verbose)
-      Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1);
-    /* we are expecting a PUBLISH message */
-    if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) {
-      if(pkt[0] == MQTT_MSG_DISCONNECT) {
-        infof(data, "Got DISCONNECT\n");
-        *done = TRUE;
-        goto end;
-      }
+    }
+    else {
       result = CURLE_WEIRD_SERVER_REPLY;
       goto end;
     }
-    else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80))
-      /* as long as the high bit is set in the length byte, we read one more
-         byte, then get the remainder of the PUBLISH */
-      mqtt->state = MQTT_SUB_REMAIN;
-    mq->npacket++;
-    if(mqtt->state == MQTT_SUBWAIT)
-      return result;
 
     /* -- switched state -- */
-
-    /* remember the first byte */
-    mq->firstbyte = pkt[0];
-
-    remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes);
-
+    remlen = mq->remaining_length;
     infof(data, "Remaining length: %zd bytes\n", remlen);
     Curl_pgrsSetDownloadSize(data, remlen);
     data->req.bytecount = 0;
     data->req.size = remlen;
     mq->npacket = remlen; /* get this many bytes */
     /* FALLTHROUGH */
-  case MQTT_SUB_REMAIN: {
+  case MQTT_PUB_REMAIN: {
     /* read rest of packet, but no more. Cap to buffer size */
     struct SingleRequest *k = &data->req;
     size_t rest = mq->npacket;
@@ -450,6 +477,11 @@
       }
       goto end;
     }
+    if(!nread) {
+      infof(data, "server disconnected\n");
+      result = CURLE_PARTIAL_FILE;
+      goto end;
+    }
     if(data->set.verbose)
       Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
 
@@ -465,7 +497,7 @@
 
     if(!mq->npacket)
       /* no more PUBLISH payload, back to subscribe wait state */
-      mqtt->state = MQTT_SUBWAIT;
+      mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
     break;
   }
   default:
@@ -481,7 +513,6 @@
 {
   CURLcode result = CURLE_OK;
   struct Curl_easy *data = conn->data;
-  struct mqtt_conn *mqtt = &conn->proto.mqtt;
 
   *done = FALSE; /* unconditionally */
 
@@ -490,7 +521,7 @@
     failf(data, "Error %d sending MQTT CONN request", result);
     return result;
   }
-  mqtt->state = MQTT_CONNACK;
+  mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
   return CURLE_OK;
 }
 
@@ -500,6 +531,10 @@
   struct mqtt_conn *mqtt = &conn->proto.mqtt;
   struct Curl_easy *data = conn->data;
   struct MQTT *mq = data->req.protop;
+  ssize_t nread;
+  curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
+  unsigned char *pkt = (unsigned char *)data->state.buffer;
+  unsigned char byte;
 
   *done = FALSE;
 
@@ -512,7 +547,41 @@
       return result;
   }
 
+  infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
   switch(mqtt->state) {
+  case MQTT_FIRST:
+    /* Read the initial byte only */
+    result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
+    if(result)
+      break;
+    if(data->set.verbose)
+      Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
+    /* remember the first byte */
+    mq->npacket = 0;
+    mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
+    /* FALLTHROUGH */
+  case MQTT_REMAINING_LENGTH:
+    do {
+      result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
+      if(result)
+        break;
+      if(data->set.verbose)
+        Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
+      pkt[mq->npacket++] = byte;
+    } while((byte & 0x80) && (mq->npacket < 4));
+    mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
+    mq->npacket = 0;
+    if(mq->remaining_length) {
+      mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
+      break;
+    }
+    mqstate(conn, MQTT_FIRST, MQTT_FIRST);
+
+    if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
+      infof(data, "Got DISCONNECT\n");
+      *done = TRUE;
+    }
+    break;
   case MQTT_CONNACK:
     result = mqtt_verify_connack(conn);
     if(result)
@@ -524,24 +593,19 @@
         result = mqtt_disconnect(conn);
         *done = TRUE;
       }
+      mqtt->nextstate = MQTT_FIRST;
     }
     else {
       result = mqtt_subscribe(conn);
-      if(!result)
-        mqtt->state = MQTT_SUBACK;
+      if(!result) {
+        mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
+      }
     }
     break;
 
   case MQTT_SUBACK:
-    result = mqtt_verify_suback(conn);
-    if(result)
-      break;
-
-    mqtt->state = MQTT_SUBWAIT;
-    break;
-
-  case MQTT_SUBWAIT:
-  case MQTT_SUB_REMAIN:
+  case MQTT_PUBWAIT:
+  case MQTT_PUB_REMAIN:
     result = mqtt_read_publish(conn, done);
     if(result)
       break;
diff --git a/lib/mqtt.h b/lib/mqtt.h
index b5e447b..155fbd6 100644
--- a/lib/mqtt.h
+++ b/lib/mqtt.h
@@ -26,13 +26,22 @@
 extern const struct Curl_handler Curl_handler_mqtt;
 #endif
 
+enum mqttstate {
+  MQTT_FIRST,             /* 0 */
+  MQTT_REMAINING_LENGTH,  /* 1 */
+  MQTT_CONNACK,           /* 2 */
+  MQTT_SUBACK,            /* 3 */
+  MQTT_SUBACK_COMING,     /* 4 - the SUBACK remainder */
+  MQTT_PUBWAIT,    /* 5 - wait for publish */
+  MQTT_PUB_REMAIN,  /* 6 - wait for the remainder of the publish */
+
+  MQTT_NOSTATE = 99 /* never an actual state */
+};
+
 struct mqtt_conn {
-  enum {
-    MQTT_CONNACK,
-    MQTT_SUBACK,
-    MQTT_SUBWAIT,    /* wait for subscribe response */
-    MQTT_SUB_REMAIN  /* wait for the remainder of the subscribe response */
-  } state;
+  enum mqttstate state;
+  enum mqttstate nextstate; /* switch to this after remaining length is
+                               done */
   unsigned int packetid;
 };
 
@@ -41,9 +50,10 @@
   char *sendleftovers;
   size_t nsend; /* size of sendleftovers */
 
-  /* when receving a PUBLISH */
+  /* when receving */
   size_t npacket; /* byte counter */
   unsigned char firstbyte;
+  size_t remaining_length;
 };
 
 #endif /* HEADER_CURL_MQTT_H */
diff --git a/tests/data/Makefile.inc b/tests/data/Makefile.inc
index 0999404..425a0c0 100644
--- a/tests/data/Makefile.inc
+++ b/tests/data/Makefile.inc
@@ -139,7 +139,7 @@
 \
 test1170 test1171 test1172 test1173 test1174 test1175 test1176 test1177 \
 \
-test1190 test1191 test1192 test1193 \
+test1190 test1191 test1192 test1193 test1194 test1195 test1196 \
 \
 test1200 test1201 test1202 test1203 test1204 test1205 test1206 test1207 \
 test1208 test1209 test1210 test1211 test1212 test1213 test1214 test1215 \
diff --git a/tests/data/test1190 b/tests/data/test1190
index 491f2b8..007a150 100644
--- a/tests/data/test1190
+++ b/tests/data/test1190
@@ -46,7 +46,7 @@
 </strippart>
 <protocol>
 client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
 client SUBSCRIBE 9 000100043131393000
 server SUBACK 3 9003000100
 server PUBLISH c 300c00043131393068656c6c6f0a
diff --git a/tests/data/test1191 b/tests/data/test1191
index fc8c68b..a36bc31 100644
--- a/tests/data/test1191
+++ b/tests/data/test1191
@@ -42,7 +42,7 @@
 </strippart>
 <protocol>
 client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
 client PUBLISH f 000431313931736f6d657468696e67
 client DISCONNECT 0 e000
 </protocol>
diff --git a/tests/data/test1192 b/tests/data/test1192
index 92b96c3..691c778 100644
--- a/tests/data/test1192
+++ b/tests/data/test1192
@@ -46,7 +46,7 @@
 </strippart>
 <protocol>
 client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
 client SUBSCRIBE 80af3131393200
 server SUBACK 3 9003000100
 server PUBLISH 80d 308df3131393268656c6c6f0a
diff --git a/tests/data/test1193 b/tests/data/test1193
index 479ed5f..8da9abb 100644
--- a/tests/data/test1193
+++ b/tests/data/test1193
@@ -64,7 +64,7 @@
 </strippart>
 <protocol>
 client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
 client PUBLISH 7c2 000431313933313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839
 client DISCONNECT 0 e000
 </protocol>
diff --git a/tests/data/test1194 b/tests/data/test1194
new file mode 100644
index 0000000..497891a
--- /dev/null
+++ b/tests/data/test1194
@@ -0,0 +1,59 @@
+<testcase>
+<info>
+<keywords>
+MQTT
+MQTT SUBSCRIBE
+</keywords>
+</info>
+
+#
+# Server-side
+<reply>
+<data nocheck="yes">
+hello
+</data>
+<datacheck hex="yes">
+00 04 31 31 39 30   68 65 6c 6c 6f 5b 4c 46 5d 0a
+</datacheck>
+<servercmd>
+PUBLISH-before-SUBACK TRUE
+</servercmd>
+</reply>
+
+#
+# Client-side
+<client>
+<features>
+mqtt
+</features>
+<server>
+mqtt
+</server>
+<name>
+MQTT SUBSCRIBE with PUBLISH befoire SUBACK
+</name>
+<command option="binary-trace">
+mqtt://%HOSTIP:%MQTTPORT/1194
+</command>
+</client>
+
+#
+# Verify data after the test has been "shot"
+<verify>
+# These are hexadecimal protocol dumps from the client
+#
+# Strip out the random part of the client id from the CONNECT message
+# before comparison
+<strippart>
+s/^(.* 00044d5154540402003c000c6375726c).*/$1/
+</strippart>
+<protocol>
+client CONNECT 18 00044d5154540402003c000c6375726c
+server CONNACK 2 20020000
+client SUBSCRIBE 9 000100043131393400
+server PUBLISH c 300c00043131393468656c6c6f0a
+server SUBACK 3 9003000100
+server DISCONNECT 0 e000
+</protocol>
+</verify>
+</testcase>
diff --git a/tests/data/test1195 b/tests/data/test1195
new file mode 100644
index 0000000..0dfaccd
--- /dev/null
+++ b/tests/data/test1195
@@ -0,0 +1,63 @@
+<testcase>
+<info>
+<keywords>
+MQTT
+MQTT SUBSCRIBE
+</keywords>
+</info>
+
+#
+# Server-side
+<reply>
+<data nocheck="yes">
+hello
+</data>
+<datacheck hex="yes">
+00 04 31 31 39 30   68 65 6c 6c 6f 5b 4c 46 5d 0a
+</datacheck>
+<servercmd>
+PUBLISH-before-SUBACK TRUE
+short-PUBLISH TRUE
+</servercmd>
+</reply>
+
+#
+# Client-side
+<client>
+<features>
+mqtt
+</features>
+<server>
+mqtt
+</server>
+<name>
+MQTT SUBSCRIBE with short PUBLISH
+</name>
+<command option="binary-trace">
+mqtt://%HOSTIP:%MQTTPORT/1195
+</command>
+</client>
+
+#
+# Verify data after the test has been "shot"
+<verify>
+# These are hexadecimal protocol dumps from the client
+#
+# Strip out the random part of the client id from the CONNECT message
+# before comparison
+<strippart>
+s/^(.* 00044d5154540402003c000c6375726c).*/$1/
+</strippart>
+<protocol>
+client CONNECT 18 00044d5154540402003c000c6375726c
+server CONNACK 2 20020000
+client SUBSCRIBE 9 000100043131393500
+server PUBLISH c 300c00043131393568656c6c
+</protocol>
+
+# 18 is CURLE_PARTIAL_FILE
+<errorcode>
+18
+</errorcode>
+</verify>
+</testcase>
diff --git a/tests/data/test1196 b/tests/data/test1196
new file mode 100644
index 0000000..c07efd9
--- /dev/null
+++ b/tests/data/test1196
@@ -0,0 +1,62 @@
+<testcase>
+<info>
+<keywords>
+MQTT
+MQTT SUBSCRIBE
+</keywords>
+</info>
+
+#
+# Server-side
+<reply>
+<data nocheck="yes">
+hello
+</data>
+<datacheck hex="yes">
+00 04 31 31 39 30   68 65 6c 6c 6f 5b 4c 46 5d 0a
+</datacheck>
+
+# error 1 - "Connection Refused, unacceptable protocol version"
+<servercmd>
+error-CONNACK 1
+</servercmd>
+</reply>
+
+#
+# Client-side
+<client>
+<features>
+mqtt
+</features>
+<server>
+mqtt
+</server>
+<name>
+MQTT with error in CONNACK
+</name>
+<command option="binary-trace">
+mqtt://%HOSTIP:%MQTTPORT/1196
+</command>
+</client>
+
+#
+# Verify data after the test has been "shot"
+<verify>
+# These are hexadecimal protocol dumps from the client
+#
+# Strip out the random part of the client id from the CONNECT message
+# before comparison
+<strippart>
+s/^(.* 00044d5154540402003c000c6375726c).*/$1/
+</strippart>
+<protocol>
+client CONNECT 18 00044d5154540402003c000c6375726c
+server CONNACK 2 20020001
+</protocol>
+
+# 8 is CURLE_WEIRD_SERVER_REPLY
+<errorcode>
+8
+</errorcode>
+</verify>
+</testcase>
diff --git a/tests/server/mqttd.c b/tests/server/mqttd.c
index db5723c..6785b00 100644
--- a/tests/server/mqttd.c
+++ b/tests/server/mqttd.c
@@ -104,6 +104,10 @@
 struct configurable {
   unsigned char version; /* initial version byte in the request must match
                             this */
+  bool publish_before_suback;
+  bool short_publish;
+  unsigned char error_connack;
+  int testnum;
 };
 
 #define REQUEST_DUMP  "log/server.input"
@@ -124,6 +128,10 @@
 {
   logmsg("Reset to defaults");
   config.version = CONFIG_VERSION;
+  config.publish_before_suback = FALSE;
+  config.short_publish = FALSE;
+  config.error_connack = 0;
+  config.testnum = 0;
 }
 
 static unsigned char byteval(char *value)
@@ -147,10 +155,29 @@
           config.version = byteval(value);
           logmsg("version [%d] set", config.version);
         }
+        else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
+          logmsg("PUBLISH-before-SUBACK set");
+          config.publish_before_suback = TRUE;
+        }
+        else if(!strcmp(key, "short-PUBLISH")) {
+          logmsg("short-PUBLISH set");
+          config.short_publish = TRUE;
+        }
+        else if(!strcmp(key, "error-CONNACK")) {
+          config.error_connack = byteval(value);
+          logmsg("error-CONNACK = %d", config.error_connack);
+        }
+        else if(!strcmp(key, "Testnum")) {
+          config.testnum = atoi(value);
+          logmsg("testnum = %d", config.testnum);
+        }
       }
     }
     fclose(fp);
   }
+  else {
+    logmsg("No config file '%s' to read", configfile);
+  }
 }
 
 static void loghex(unsigned char *buffer, ssize_t len)
@@ -209,11 +236,17 @@
     MQTT_MSG_CONNACK, 0x02,
     0x00, 0x00
   };
-  ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
-  if(rc == sizeof(packet)) {
-    logmsg("WROTE %d bytes [CONACK]", rc);
+  ssize_t rc;
+
+  packet[3] = config.error_connack;
+
+  rc = swrite(fd, (char *)packet, sizeof(packet));
+  if(rc > 0) {
+    logmsg("WROTE %d bytes [CONNACK]", rc);
     loghex(packet, rc);
-    logprotocol(FROM_SERVER, "CONACK", 2, dump, packet, sizeof(packet));
+    logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
+  }
+  if(rc == sizeof(packet)) {
     return 0;
   }
   return 1;
@@ -360,6 +393,7 @@
   size_t payloadindex;
   ssize_t remaininglength = topiclen + 2 + payloadlen;
   ssize_t packetlen;
+  ssize_t sendamount;
   ssize_t rc;
   char rembuffer[4];
   int encodedlen;
@@ -385,13 +419,18 @@
   payloadindex = 3 + topiclen + encodedlen;
   memcpy(&packet[payloadindex], payload, payloadlen);
 
-  rc = swrite(fd, (char *)packet, packetlen);
-  if(rc == packetlen) {
+  sendamount = packetlen;
+  if(config.short_publish)
+    sendamount -= 2;
+
+  rc = swrite(fd, (char *)packet, sendamount);
+  if(rc > 0) {
     logmsg("WROTE %d bytes [PUBLISH]", rc);
     loghex(packet, rc);
     logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
-    return 0;
   }
+  if(rc == packetlen)
+    return 0;
   return 1;
 }
 
@@ -459,6 +498,11 @@
 
   getconfig();
 
+  testno = config.testnum;
+
+  if(testno)
+    logmsg("Found test number %ld", testno);
+
   do {
     /* get the fixed header */
     rc = fixedheader(fd, &byte, &remaining_length, &bytes);
@@ -506,8 +550,10 @@
       }
     }
     else if(byte == MQTT_MSG_SUBSCRIBE) {
-      char *testnop;
-
+      FILE *stream;
+      int error;
+      char *data;
+      size_t datalen;
       logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
                   dump, buffer, rc);
       logmsg("Incoming SUBSCRIBE");
@@ -533,26 +579,25 @@
       /* there's a QoS byte (two bits) after the topic */
 
       logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
-      if(suback(dump, fd, packet_id)) {
-        logmsg("failed sending SUBACK");
-        goto end;
-      }
-      testnop = strrchr(topic, '/');
-      if(!testnop)
-        testnop = topic;
-      else
-        testnop++; /* pass the slash */
-      testno = strtol(testnop, NULL, 10);
-      if(testno) {
-        FILE *stream;
-        int error;
-        char *data;
-        size_t datalen;
-        logmsg("Found test number %ld", testno);
-        stream = test2fopen(testno);
-        error = getpart(&data, &datalen, "reply", "data", stream);
-        if(!error)
-          publish(dump, fd, packet_id, topic, data, datalen);
+      stream = test2fopen(testno);
+      error = getpart(&data, &datalen, "reply", "data", stream);
+      if(!error) {
+        if(!config.publish_before_suback) {
+          if(suback(dump, fd, packet_id)) {
+            logmsg("failed sending SUBACK");
+            goto end;
+          }
+        }
+        if(publish(dump, fd, packet_id, topic, data, datalen)) {
+          logmsg("PUBLISH failed");
+          goto end;
+        }
+        if(config.publish_before_suback) {
+          if(suback(dump, fd, packet_id)) {
+            logmsg("failed sending SUBACK");
+            goto end;
+          }
+        }
       }
       else {
         char *def = (char *)"this is random payload yes yes it is";