MQTT Subscriber
This document is continuation of the Publisher to preview the subscriber demo. Prefer to read that first.
The MQ client will be automatically added to container once the needed service configurations available.
zig
ctx.MQ.publish("topic"); //publishes message to a topic on the subscribed client
app.addSubscription("topic", subscriber-handler); //listens for upcoming event and injects into subscriber handler for further actions.Limitations
- Upto 32KB size of message supported by default.
- Subscribes to only one topic now, work in progress to make it available for multiple
topicswithout any change from app developers.
Example
This document demonstrates the subscribing to a topic using zero built-in solution MQ client.
- Refer following
zero-mqtt-subscriberexample further to know more on getting started of this.
zig
const std = @import("std");
const zero = @import("zero");
const App = zero.App;
const Context = zero.Context;
const utils = zero.utils;
pub const std_options: std.Options = .{
.logFn = zero.logger.custom,
};
const pubSubTopic = "zero";
pub fn main() !void {
var arena_instance = std.heap.ArenaAllocator.init(std.heap.page_allocator);
defer arena_instance.deinit();
const allocator = arena_instance.allocator();
const app = try App.new(allocator);
try app.addSubscription(pubSubTopic, subscribeTask);
// prefer only one topic for now
// second one will not be captured
// try app.addSubscription("second", subscribeTask);
try app.run();
}
const customMessage = struct {
msg: []const u8 = undefined,
topic: []const u8 = undefined,
};
fn subscribeTask(ctx: *Context) !void {
const timestamp = try utils.sqlTimestampz(ctx.allocator);
var m: customMessage = undefined;
//transform ctx.message to custom type in packet read itself
if (ctx.message) |message| {
m = customMessage{};
m.msg = message.payload.?;
m.topic = message.topic;
var buffer: []u8 = undefined;
buffer = try ctx.allocator.alloc(u8, 1024);
buffer = try std.fmt.bufPrint(buffer, "Received on [{s}] {s}", .{ m.topic, m.msg });
defer ctx.allocator.free(buffer);
ctx.log.info(timestamp);
ctx.log.info(buffer);
}
}- Boom! lets build and run our app.
bash
zero/examples/zero-mqtt-subscriber on main [✘!?] via ↯ v0.15.1
❯ zig build pubsub
INFO [04:15:48] Loaded config from file: ./configs/.env
INFO [04:15:48] config overriden ./configs/.dev.env file not found.
DEBUG [04:15:48] database is disabled, as dialect is not provided.
DEBUG [04:15:48] redis is disabled, as redis host is not provided.
INFO [04:15:48] connecting to MQTT at '127.0.0.1:1883'
INFO [04:15:48] MQTT server connected
INFO [04:15:48] MQTT client id auto-A81E9F0C-BABF-707F-B87E-521075B1E3D9
INFO [04:15:48] connected to MQTT at '127.0.0.1:1883'
INFO [04:15:48] container is being created
INFO [04:15:48] no authentication mode found and disabled.
INFO [04:15:48] zero-mqtt-subscriber app pid 24069
INFO [04:15:48] topic:zero pubsub subscriber added
INFO [04:15:48] registered static files from directory ./static
INFO [04:15:48] starting subscriptions
INFO [04:15:48] Starting server on port: 8080
INFO [04:16:00] 2025-11-03T04:16:00
INFO [04:16:00] Received on [zero] publisher 1 says hello!
INFO [04:16:01] 2025-11-03T04:16:01
INFO [04:16:01] Received on [zero] publisher 2 says hi!
INFO [04:16:30] 2025-11-03T04:16:30
INFO [04:16:30] Received on [zero] publisher 2 says hi!
INFO [04:17:00] 2025-11-03T04:17:00
INFO [04:17:00] Received on [zero] publisher 1 says hello!
INFO [04:17:01] 2025-11-03T04:17:01
INFO [04:17:01] Received on [zero] publisher 2 says hi!- Preview server status, subscription and publish status.


Recommendation
🚩 It is highly recommended to use the ctx allocator whenever possible, since it is tied up with request life-cycle, the de-allocation will be managed automatically and making sure the memory leak is not happening.

