123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- /*
- * ZMap Redis Helpers Copyright 2013 Regents of the University of Michigan
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
- * of the License at http://www.apache.org/licenses/LICENSE-2.0
- */
- #include "redis.h"
- #include <string.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <stdint.h>
- #include <assert.h>
- #include <hiredis/hiredis.h>
- #include "logger.h"
- #include "xalloc.h"
- #define REDIS_TIMEOUT 2
- #undef MIN
- #define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
- static redisContext *rctx;
- redisconf_t *redis_parse_connstr(char *connstr)
- {
- redisconf_t *retv = xmalloc(sizeof(redisconf_t));
- if (!strncmp("tcp://", connstr, 6)) {
- char *servername = xmalloc(strlen(connstr));
- char *list_name = xmalloc(strlen(connstr));
- uint32_t port;
- if (sscanf(connstr, "tcp://%[^:]:%u/%s", servername,
- &port, list_name) != 3) {
- log_fatal("redis", "unable to parse redis connection string. This "
- "should be of the form tcp://server:port/list-name "
- "for TCP connections. All fields are required.");
- }
- retv->type = T_TCP;
- retv->server = servername;
- retv->port = port;
- retv->list_name = list_name;
- retv->path = NULL;
- } else if (!strncmp("local://", connstr, 8)) {
- // looking for something along the lines of
- // local:///tmp/redis.sock/list-name
- char *path = xmalloc(strlen(connstr));
- char *list_name = xmalloc(strlen(connstr));
- connstr = connstr + (size_t) 8;
- char *listname = strrchr(connstr, '/') + (size_t) 1;
- connstr[strrchr(connstr, '/') - connstr] = '\0';
- strcpy(path, connstr);
- strcpy(list_name, listname);
- retv->type = T_LOCAL;
- retv->list_name = list_name;
- retv->path = path;
- retv->server = NULL;
- retv->port = 0;
- } else {
- log_fatal("redis", "unable to parse connection string. does not begin with "
- "local:// or tcp:// as expected");
- }
- return retv;
- }
- redisContext* redis_connect(char *connstr)
- {
- redisconf_t *c;
- // handle old behavior where we only connected to a specific
- // socket that we #defined.
- if (!connstr) {
- c = xmalloc(sizeof(redisconf_t));
- c->type = T_LOCAL;
- c->path = strdup("/tmp/redis.sock");
- } else {
- c = redis_parse_connstr(connstr);
- assert(c);
- }
- struct timeval timeout;
- timeout.tv_sec = REDIS_TIMEOUT;
- timeout.tv_usec = 0;
- if (c->type == T_LOCAL) {
- return (redisContext*) redisConnectUnixWithTimeout(c->path,
- timeout);
- } else {
- return (redisContext*) redisConnectWithTimeout(c->server,
- c->port, timeout);
- }
- }
- static int chkerr(redisContext *rctx, redisReply *reply)
- {
- if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
- log_error("redis", "an error occurred when "
- "retrieving item from redis: %s",
- rctx->errstr);
- if (reply) {
- freeReplyObject(reply);
- }
- return -1;
- }
- return 0;
- }
- int redis_init(char *connstr)
- {
- rctx = redis_connect(connstr);
- if (!rctx) {
- return -1;
- }
- return 0;
- }
- int redis_close(void)
- {
- redisFree(rctx);
- return 0;
- }
- redisContext* redis_get_context(void)
- {
- return rctx;
- }
- int redis_flush(void)
- {
- redisReply *reply = (redisReply*) redisCommand(rctx, "FLUSHDB");
- if (chkerr(rctx, reply)) {
- return -1;
- }
- freeReplyObject(reply);
- return 0;
- }
- int redis_existconf(const char *name)
- {
- assert(rctx);
- redisReply *reply = (redisReply*) redisCommand(rctx, "EXISTS %s", name);
- if (chkerr(rctx, reply)) {
- return -1;
- }
- int v = reply->integer;
- freeReplyObject(reply);
- return v;
- }
- int redis_delconf(const char *name)
- {
- assert(rctx);
- redisReply *reply = (redisReply*) redisCommand(rctx, "DEL %s", name);
- if (chkerr(rctx, reply)) {
- return -1;
- }
- freeReplyObject(reply);
- return 0;
- }
- int redis_setconf(const char *name, char *value)
- {
- assert(rctx);
- redisReply *reply = (redisReply*) redisCommand(rctx, "SET %s %s",
- name, value);
- if (chkerr(rctx, reply)) {
- return -1;
- }
- freeReplyObject(reply);
- return 0;
- }
- int redis_getconf(const char *name, char *buf, size_t maxlen)
- {
- assert(rctx);
- redisReply *reply = (redisReply*) redisCommand(rctx, "GET %s", name);
- if (chkerr(rctx, reply)) {
- return -1;
- }
- strncpy(buf, reply->str, maxlen);
- freeReplyObject(reply);
- return 0;
- }
- uint32_t redis_getconf_uint32_t(const char *key)
- {
- assert(rctx);
- char buf[50];
- redis_getconf(key, buf, 50);
- return atoi(buf);
- }
- int redis_setconf_uint32_t(const char *key, uint32_t value)
- {
- assert(rctx);
- char buf[50];
- sprintf(buf, "%u", value);
- return redis_setconf(key, buf);
- }
- static long redis_get_sizeof(const char *cmd, const char *name)
- {
- assert(rctx);
- redisReply *reply;
- reply = (redisReply*) redisCommand(rctx, "%s %s", cmd, name);
- assert(reply);
- assert(reply->type == REDIS_REPLY_INTEGER);
- long rtr = reply->integer;
- freeReplyObject(reply);
- return rtr;
- }
- long redis_get_sizeof_list(const char *name)
- {
- return redis_get_sizeof("LLEN", name);
- }
- long redis_get_sizeof_set(const char *name)
- {
- return redis_get_sizeof("SCARD", name);
- }
- int redis_pull(char *redisqueuename, void *buf,
- int maxload, size_t obj_size, int *numloaded, const char* cmd)
- {
- assert(rctx);
- long elems_in_redis = redis_get_sizeof_list(redisqueuename);
- long num_to_add = MIN(elems_in_redis, maxload);
- log_info("redis", "INFO: redis load called on %s. Transferring %li "
- "of %li elements to in-memory queue.",
- redisqueuename,
- num_to_add, elems_in_redis);
- for(int i=0; i < num_to_add; i++) {
- redisAppendCommand(rctx, "%s %s", cmd, redisqueuename);
- }
- for(int i=0; i < num_to_add; i++) {
- redisReply *reply;
- int rc = redisGetReply(rctx, (void**) &reply);
- if (rc != REDIS_OK) {
- log_fatal("redis", "response from redis != REDIS_OK");
- return -1;
- }
- if (!reply) {
- log_fatal("redis", "no reply provided by redis.");
- return -1;
- }
- if (reply->type != REDIS_REPLY_STRING) {
- log_fatal("redis",
- "unxpected reply type from redis.");
- return -1;
- }
- if ((size_t)reply->len != obj_size) {
- log_fatal("redis", "ERROR: unexpected lengthed "
- "object provided by redis.\n");
- return -1;
- }
- memcpy((void*)((intptr_t)buf+i*obj_size), reply->str, obj_size);
- freeReplyObject(reply);
- }
- *numloaded = num_to_add;
- return 0;
- }
- int redis_lpull(char *redisqueuename, void *buf,
- int maxload, size_t obj_size, int *numloaded)
- {
- return redis_pull(redisqueuename, buf,
- maxload, obj_size, numloaded, "LPOP");
- }
- int redis_spull(char *redisqueuename, void *buf,
- int maxload, size_t obj_size, int *numloaded)
- {
- return redis_pull(redisqueuename, buf,
- maxload, obj_size, numloaded, "SRAND");
- }
- static int redis_pull_one(redisContext *rctx, char *queuename, void **buf, size_t *len, const char *cmd)
- {
- assert(rctx);
- redisReply *reply = (redisReply*) redisCommand(rctx, "%s %s", cmd, queuename);
- if (!reply) {
- log_fatal("redis", "no reply provided by redis.");
- }
- if (reply-> type == REDIS_REPLY_NIL) {
- return REDIS_EMPTY;
- }
- if (reply->type != REDIS_REPLY_STRING) {
- log_fatal("redis", "redis unxpected reply type from redis: %s", reply->str);
- }
- *len = reply->len;
- void *temp = (char*) malloc(*len);
- assert(temp);
- *buf = temp;
- memcpy(temp, reply->str, *len);
- freeReplyObject(reply);
- return REDIS_SUCCESS;
- }
- int redis_lpull_one(redisContext *rctx, char *queuename, void **buf, size_t *len)
- {
- return redis_pull_one(rctx, queuename, buf, len, "LPOP");
- }
- int redis_spull_one(redisContext *rctx, char *queuename, void **buf, size_t *len)
- {
- return redis_pull_one(rctx, queuename, buf, len, "SRAND");
- }
- static int redis_push(char *redisqueuename,
- void *buf, int num, size_t len, const char *cmd)
- {
- assert(rctx);
- for (int i=0; i < num; i++) {
- void* load = (void*)((intptr_t)buf + i*len);
- int rc = redisAppendCommand(rctx, "%s %s %b",
- cmd, redisqueuename, load, len);
- if (rc != REDIS_OK || rctx->err) {
- log_fatal("redis", "%s", rctx->errstr);
- }
- }
- redisReply *reply;
- for (int i=0; i < num; i++) {
- if (redisGetReply(rctx, (void**) &reply) != REDIS_OK || rctx->err) {
- log_fatal("redis","%s", rctx->errstr);
- }
- if (reply->type == REDIS_REPLY_ERROR) {
- log_fatal("redis", "%s", rctx->errstr);
- }
- freeReplyObject(reply);
- }
- return 0;
- }
- int redis_lpush(char *redisqueuename,
- void *buf, int num, size_t len)
- {
- return redis_push(redisqueuename, buf, num, len, "RPUSH");
- }
- int redis_spush(char *redisqueuename,
- void *buf, int num, size_t len)
- {
- return redis_push(redisqueuename, buf, num, len, "SADD");
- }
- static int redis_push_one(redisContext *rctx, char *queuename, void *buf, size_t len, const char *cmd)
- {
- assert(rctx);
- redisReply *reply = (redisReply*) redisCommand(rctx, "%s %s %b", cmd, queuename, buf, len);
- if (chkerr(rctx, reply)) {
- return -1;
- }
- freeReplyObject(reply);
- return 0;
- }
- int redis_lpush_one(redisContext *rctx, char *queuename,
- void *buf, size_t len)
- {
- return redis_push_one(rctx, queuename, buf, len, "RPUSH");
- }
- int redis_spush_one(redisContext *rctx, char *queuename,
- void *buf, size_t len)
- {
- return redis_push_one(rctx, queuename, buf, len, "SADD");
- }
- static int redis_push_strings(char *redisqueuename, char **buf, int num, const char *cmd)
- {
- assert(rctx);
- for (int i=0; i < num; i++) {
- int rc = redisAppendCommand(rctx, "%s %s %s", cmd, redisqueuename, buf[i]);
- if (rc != REDIS_OK || rctx->err) {
- log_fatal("redis", "%s", rctx->errstr);
- }
- }
- redisReply *reply;
- for (int i=0; i < num; i++) {
- if (redisGetReply(rctx, (void**) &reply) != REDIS_OK || rctx->err) {
- log_fatal("redis","%s", rctx->errstr);
- }
- if (reply->type == REDIS_REPLY_ERROR) {
- log_fatal("redis", "%s", rctx->errstr);
- }
- freeReplyObject(reply);
- }
- return 0;
- }
- int redis_lpush_strings(char *redisqueuename, char **buf, int num)
- {
- return redis_push_strings(redisqueuename, buf, num, "RPUSH");
- }
- int redis_spush_strings(char *redisqueuename, char **buf, int num)
- {
- return redis_push_strings(redisqueuename, buf, num, "SADD");
- }
|