Metadata Broker  0.23.2
consumer/main.c
/*
* Copyright Axis Communications AB
*
* This example creates a subscriber and subscribes to metadata from a channel
* with a given topic and source.
*/
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/syslog.h>
#include <syslog.h>
#include <time.h>
#include <unistd.h>
#include <mdb/connection.h>
#include <mdb/error.h>
#include <mdb/subscriber.h>
typedef struct channel_identifier {
char *topic;
char *source;
} channel_identifier_t;
static void on_connection_error(mdb_error_t *error, void *user_data)
{
(void)user_data;
syslog(LOG_ERR, "Got connection error: %s, Aborting...", error->message);
abort();
}
static void on_metadata(const mdb_metadata_t *metadata, void *user_data)
{
const struct timespec *timestamp = mdb_metadata_get_timestamp(metadata);
channel_identifier_t *expression = (channel_identifier_t *)user_data;
// Process the metadata.
syslog(LOG_INFO,
"metadata received from topic: %s on source: %s: Monotonic time - "
"%lld.%.9ld. Data - %.*s",
expression->topic, expression->source, (long long)timestamp->tv_sec,
timestamp->tv_nsec, (int)payload->size, (char *)payload->data);
}
static void on_done_subscriber_create(const mdb_error_t *error, void *user_data)
{
if (error != NULL) {
syslog(LOG_ERR, "Got subscription error: %s, Aborting...", error->message);
abort();
}
channel_identifier_t *expression = (channel_identifier_t *)user_data;
syslog(LOG_INFO, "Subscribed to %s (%s)...", expression->topic,
expression->source);
}
void sig_handler(int signum)
{
(void)signum;
// Do nothing, just let pause in main return.
}
int main(int argc, char **argv)
{
syslog(LOG_INFO, "Subscriber started...");
channel_identifier_t channel_identifier = {.topic = "test_topic",
.source = "1"};
if (argc > 1) {
channel_identifier.topic = argv[1];
};
mdb_error_t *error = NULL;
mdb_subscriber_config_t *subscriber_config = NULL;
mdb_subscriber_t *subscriber = NULL;
mdb_connection_t *connection =
mdb_connection_create(on_connection_error, NULL, &error);
if (error != NULL) {
goto end;
}
subscriber_config = mdb_subscriber_config_create(
channel_identifier.topic, channel_identifier.source, on_metadata,
&channel_identifier, &error);
if (error != NULL) {
goto end;
}
subscriber = mdb_subscriber_create_async(connection, subscriber_config,
on_done_subscriber_create,
&channel_identifier, &error);
if (error != NULL) {
goto end;
}
// Add signal handler to allow for cleanup on ordered termination.
(void)signal(SIGTERM, sig_handler);
(void)signal(SIGINT, sig_handler);
pause();
end:
if (error != NULL) {
syslog(LOG_ERR, "%s", error->message);
}
mdb_subscriber_config_destroy(&subscriber_config);
mdb_subscriber_destroy(&subscriber);
mdb_connection_destroy(&connection);
syslog(LOG_INFO, "Subscriber closed...");
return 0;
}
struct mdb_connection mdb_connection_t
Definition: connection.h:29
mdb_connection_t * mdb_connection_create(mdb_on_error_t on_error, void *user_data, mdb_error_t **error)
void mdb_connection_destroy(mdb_connection_t **self)
void mdb_error_destroy(mdb_error_t **error)
const struct timespec * mdb_metadata_get_timestamp(const mdb_metadata_t *metadata)
struct mdb_metadata mdb_metadata_t
Definition: metadata.h:19
const mdb_metadata_payload_t * mdb_metadata_get_payload(const mdb_metadata_t *metadata)
Definition: error.h:41
char * message
Definition: error.h:43
Definition: metadata.h:23
uint8_t * data
Definition: metadata.h:25
size_t size
Definition: metadata.h:24
void mdb_subscriber_destroy(mdb_subscriber_t **self)
struct mdb_subscriber mdb_subscriber_t
Definition: subscriber.h:28
mdb_subscriber_t * mdb_subscriber_create_async(mdb_connection_t *connection, mdb_subscriber_config_t *config, mdb_on_done_t on_done, void *user_data, mdb_error_t **error)
mdb_subscriber_config_t * mdb_subscriber_config_create(const char *topic, const char *source, mdb_subscriber_on_metadata_t on_metadata, void *user_data, mdb_error_t **error)
void mdb_subscriber_config_destroy(mdb_subscriber_config_t **self)
struct mdb_subscriber_config mdb_subscriber_config_t
Definition: subscriber_config.h:25