Commit cc37b99a authored by Michael Ott's avatar Michael Ott
Browse files

Add mosquitto_max_queued_messages_set() function to mosquitto to limit the...

Add mosquitto_max_queued_messages_set() function to mosquitto to limit the number of messages that may be queued.
parent 5847df3e
diff --git a/lib/actions.c b/lib/actions.c
index 5e50dbe..c92c7ab 100644
--- a/lib/actions.c
+++ b/lib/actions.c
@@ -51,6 +51,10 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p
if(qos == 0){
return send__publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false);
}else{
+ if (mosq->max_queued_messages > 0 && (mosq->in_queue_len + mosq->out_queue_len + mosq->inflight_messages + 1 >= mosq->max_queued_messages)) {
+ return MOSQ_ERR_NOMEM;
+ }
+
message = mosquitto__calloc(1, sizeof(struct mosquitto_message_all));
if(!message) return MOSQ_ERR_NOMEM;
diff --git a/lib/loop.c b/lib/loop.c
index e4a985e..ca51772 100644
--- a/lib/loop.c
......@@ -178,3 +193,76 @@ index e4a985e..ca51772 100644
}
#endif
}
diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c
index f6c50b8..6bbc6e2 100644
--- a/lib/messages_mosq.c
+++ b/lib/messages_mosq.c
@@ -399,3 +399,12 @@ int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max
return MOSQ_ERR_SUCCESS;
}
+int mosquitto_max_queued_messages_set(struct mosquitto *mosq, unsigned int max_queued_messages)
+{
+ if(!mosq) return MOSQ_ERR_INVAL;
+
+ mosq->max_queued_messages = max_queued_messages;
+
+ return MOSQ_ERR_SUCCESS;
+}
+
diff --git a/lib/mosquitto.c b/lib/mosquitto.c
index eab1d99..76715f7 100644
--- a/lib/mosquitto.c
+++ b/lib/mosquitto.c
@@ -162,6 +162,7 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_se
mosq->out_messages = NULL;
mosq->out_messages_last = NULL;
mosq->max_inflight_messages = 20;
+ mosq->max_queued_messages = 0;
mosq->will = NULL;
mosq->on_connect = NULL;
mosq->on_publish = NULL;
diff --git a/lib/mosquitto.h b/lib/mosquitto.h
index 57a22ec..9ed8c28 100644
--- a/lib/mosquitto.h
+++ b/lib/mosquitto.h
@@ -1366,6 +1366,27 @@ libmosq_EXPORT int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigne
*/
libmosq_EXPORT int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max_inflight_messages);
+/*
+ * Function: mosquitto_max_queued_messages_set
+ *
+ * Set the number of QoS 1 and 2 messages that can be queued at one time.
+ * This will be on top of the "in flight" messages and is meant to limit the
+ * amount of memory used for buffering messages when they cannot be delivered
+ * to the broker immediately.
+ *
+ * Set to 0 for no maximum.
+ *
+ * Parameters:
+ * mosq - a valid mosquitto instance.
+ * max_queued_messages - the maximum number of inflight messages. Defaults
+ * to 0.
+ *
+ * Returns:
+ * MOSQ_ERR_SUCCESS - on success.
+ * MOSQ_ERR_INVAL - if the input parameters were invalid.
+ */
+libmosq_EXPORT int mosquitto_max_queued_messages_set(struct mosquitto *mosq, unsigned int max_queued_messages);
+
/*
* Function: mosquitto_message_retry_set
*
diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h
index c6b3d6e..7505916 100644
--- a/lib/mosquitto_internal.h
+++ b/lib/mosquitto_internal.h
@@ -271,6 +271,7 @@ struct mosquitto {
struct mosquitto__packet *out_packet_last;
int inflight_messages;
int max_inflight_messages;
+ int max_queued_messages;
# ifdef WITH_SRV
ares_channel achan;
# endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment