redis.c 9.6 KB


  1. /*
  2. * ZMap Redis Helpers Copyright 2013 Regents of the University of Michigan
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  5. * use this file except in compliance with the License. You may obtain a copy
  6. * of the License at http://www.apache.org/licenses/LICENSE-2.0
  7. */
  8. #include "redis.h"
  9. #include <string.h>
  10. #include <stdlib.h>
  11. #include <stdio.h>
  12. #include <stdint.h>
  13. #include <assert.h>
  14. #include <hiredis/hiredis.h>
  15. #include "logger.h"
  16. #include "xalloc.h"
  17. #define REDIS_TIMEOUT 2
  18. #undef MIN
  19. #define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
  20. static redisContext *rctx;
  21. redisconf_t *redis_parse_connstr(char *connstr)
  22. {
  23. redisconf_t *retv = xmalloc(sizeof(redisconf_t));
  24. if (!strncmp("tcp://", connstr, 6)) {
  25. char *servername = xmalloc(strlen(connstr));
  26. char *list_name = xmalloc(strlen(connstr));
  27. uint32_t port;
  28. if (sscanf(connstr, "tcp://%[^:]:%u/%s", servername,
  29. &port, list_name) != 3) {
  30. log_fatal("redis", "unable to parse redis connection string. This "
  31. "should be of the form tcp://server:port/list-name "
  32. "for TCP connections. All fields are required.");
  33. }
  34. retv->type = T_TCP;
  35. retv->server = servername;
  36. retv->port = port;
  37. retv->list_name = list_name;
  38. retv->path = NULL;
  39. } else if (!strncmp("local://", connstr, 8)) {
  40. // looking for something along the lines of
  41. // local:///tmp/redis.sock/list-name
  42. char *path = xmalloc(strlen(connstr));
  43. char *list_name = xmalloc(strlen(connstr));
  44. connstr = connstr + (size_t) 8;
  45. char *listname = strrchr(connstr, '/') + (size_t) 1;
  46. connstr[strrchr(connstr, '/') - connstr] = '\0';
  47. strcpy(path, connstr);
  48. strcpy(list_name, listname);
  49. retv->type = T_LOCAL;
  50. retv->list_name = list_name;
  51. retv->path = path;
  52. retv->server = NULL;
  53. retv->port = 0;
  54. } else {
  55. log_fatal("redis", "unable to parse connection string. does not begin with "
  56. "local:// or tcp:// as expected");
  57. }
  58. return retv;
  59. }
  60. redisContext* redis_connect(char *connstr)
  61. {
  62. redisconf_t *c;
  63. // handle old behavior where we only connected to a specific
  64. // socket that we #defined.
  65. if (!connstr) {
  66. c = xmalloc(sizeof(redisconf_t));
  67. c->type = T_LOCAL;
  68. c->path = strdup("/tmp/redis.sock");
  69. } else {
  70. c = redis_parse_connstr(connstr);
  71. assert(c);
  72. }
  73. struct timeval timeout;
  74. timeout.tv_sec = REDIS_TIMEOUT;
  75. timeout.tv_usec = 0;
  76. if (c->type == T_LOCAL) {
  77. return (redisContext*) redisConnectUnixWithTimeout(c->path,
  78. timeout);
  79. } else {
  80. return (redisContext*) redisConnectWithTimeout(c->server,
  81. c->port, timeout);
  82. }
  83. }
  84. static int chkerr(redisContext *rctx, redisReply *reply)
  85. {
  86. if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
  87. log_error("redis", "an error occurred when "
  88. "retrieving item from redis: %s",
  89. rctx->errstr);
  90. if (reply) {
  91. freeReplyObject(reply);
  92. }
  93. return -1;
  94. }
  95. return 0;
  96. }
  97. int redis_init(char *connstr)
  98. {
  99. rctx = redis_connect(connstr);
  100. if (!rctx) {
  101. return -1;
  102. }
  103. return 0;
  104. }
  105. int redis_close(void)
  106. {
  107. redisFree(rctx);
  108. return 0;
  109. }
  110. redisContext* redis_get_context(void)
  111. {
  112. return rctx;
  113. }
  114. int redis_flush(void)
  115. {
  116. redisReply *reply = (redisReply*) redisCommand(rctx, "FLUSHDB");
  117. if (chkerr(rctx, reply)) {
  118. return -1;
  119. }
  120. freeReplyObject(reply);
  121. return 0;
  122. }
  123. int redis_existconf(const char *name)
  124. {
  125. assert(rctx);
  126. redisReply *reply = (redisReply*) redisCommand(rctx, "EXISTS %s", name);
  127. if (chkerr(rctx, reply)) {
  128. return -1;
  129. }
  130. int v = reply->integer;
  131. freeReplyObject(reply);
  132. return v;
  133. }
  134. int redis_delconf(const char *name)
  135. {
  136. assert(rctx);
  137. redisReply *reply = (redisReply*) redisCommand(rctx, "DEL %s", name);
  138. if (chkerr(rctx, reply)) {
  139. return -1;
  140. }
  141. freeReplyObject(reply);
  142. return 0;
  143. }
  144. int redis_setconf(const char *name, char *value)
  145. {
  146. assert(rctx);
  147. redisReply *reply = (redisReply*) redisCommand(rctx, "SET %s %s",
  148. name, value);
  149. if (chkerr(rctx, reply)) {
  150. return -1;
  151. }
  152. freeReplyObject(reply);
  153. return 0;
  154. }
  155. int redis_getconf(const char *name, char *buf, size_t maxlen)
  156. {
  157. assert(rctx);
  158. redisReply *reply = (redisReply*) redisCommand(rctx, "GET %s", name);
  159. if (chkerr(rctx, reply)) {
  160. return -1;
  161. }
  162. strncpy(buf, reply->str, maxlen);
  163. freeReplyObject(reply);
  164. return 0;
  165. }
  166. uint32_t redis_getconf_uint32_t(const char *key)
  167. {
  168. assert(rctx);
  169. char buf[50];
  170. redis_getconf(key, buf, 50);
  171. return atoi(buf);
  172. }
  173. int redis_setconf_uint32_t(const char *key, uint32_t value)
  174. {
  175. assert(rctx);
  176. char buf[50];
  177. sprintf(buf, "%u", value);
  178. return redis_setconf(key, buf);
  179. }
  180. static long redis_get_sizeof(const char *cmd, const char *name)
  181. {
  182. assert(rctx);
  183. redisReply *reply;
  184. reply = (redisReply*) redisCommand(rctx, "%s %s", cmd, name);
  185. assert(reply);
  186. assert(reply->type == REDIS_REPLY_INTEGER);
  187. long rtr = reply->integer;
  188. freeReplyObject(reply);
  189. return rtr;
  190. }
  191. long redis_get_sizeof_list(const char *name)
  192. {
  193. return redis_get_sizeof("LLEN", name);
  194. }
  195. long redis_get_sizeof_set(const char *name)
  196. {
  197. return redis_get_sizeof("SCARD", name);
  198. }
  199. int redis_pull(char *redisqueuename, void *buf,
  200. int maxload, size_t obj_size, int *numloaded, const char* cmd)
  201. {
  202. assert(rctx);
  203. long elems_in_redis = redis_get_sizeof_list(redisqueuename);
  204. long num_to_add = MIN(elems_in_redis, maxload);
  205. log_info("redis", "INFO: redis load called on %s. Transferring %li "
  206. "of %li elements to in-memory queue.",
  207. redisqueuename,
  208. num_to_add, elems_in_redis);
  209. for(int i=0; i < num_to_add; i++) {
  210. redisAppendCommand(rctx, "%s %s", cmd, redisqueuename);
  211. }
  212. for(int i=0; i < num_to_add; i++) {
  213. redisReply *reply;
  214. int rc = redisGetReply(rctx, (void**) &reply);
  215. if (rc != REDIS_OK) {
  216. log_fatal("redis", "response from redis != REDIS_OK");
  217. return -1;
  218. }
  219. if (!reply) {
  220. log_fatal("redis", "no reply provided by redis.");
  221. return -1;
  222. }
  223. if (reply->type != REDIS_REPLY_STRING) {
  224. log_fatal("redis",
  225. "unxpected reply type from redis.");
  226. return -1;
  227. }
  228. if ((size_t)reply->len != obj_size) {
  229. log_fatal("redis", "ERROR: unexpected lengthed "
  230. "object provided by redis.\n");
  231. return -1;
  232. }
  233. memcpy((void*)((intptr_t)buf+i*obj_size), reply->str, obj_size);
  234. freeReplyObject(reply);
  235. }
  236. *numloaded = num_to_add;
  237. return 0;
  238. }
  239. int redis_lpull(char *redisqueuename, void *buf,
  240. int maxload, size_t obj_size, int *numloaded)
  241. {
  242. return redis_pull(redisqueuename, buf,
  243. maxload, obj_size, numloaded, "LPOP");
  244. }
  245. int redis_spull(char *redisqueuename, void *buf,
  246. int maxload, size_t obj_size, int *numloaded)
  247. {
  248. return redis_pull(redisqueuename, buf,
  249. maxload, obj_size, numloaded, "SRAND");
  250. }
  251. static int redis_pull_one(redisContext *rctx, char *queuename, void **buf, size_t *len, const char *cmd)
  252. {
  253. assert(rctx);
  254. redisReply *reply = (redisReply*) redisCommand(rctx, "%s %s", cmd, queuename);
  255. if (!reply) {
  256. log_fatal("redis", "no reply provided by redis.");
  257. }
  258. if (reply-> type == REDIS_REPLY_NIL) {
  259. return REDIS_EMPTY;
  260. }
  261. if (reply->type != REDIS_REPLY_STRING) {
  262. log_fatal("redis", "redis unxpected reply type from redis: %s", reply->str);
  263. }
  264. *len = reply->len;
  265. void *temp = (char*) malloc(*len);
  266. assert(temp);
  267. *buf = temp;
  268. memcpy(temp, reply->str, *len);
  269. freeReplyObject(reply);
  270. return REDIS_SUCCESS;
  271. }
  272. int redis_lpull_one(redisContext *rctx, char *queuename, void **buf, size_t *len)
  273. {
  274. return redis_pull_one(rctx, queuename, buf, len, "LPOP");
  275. }
  276. int redis_spull_one(redisContext *rctx, char *queuename, void **buf, size_t *len)
  277. {
  278. return redis_pull_one(rctx, queuename, buf, len, "SRAND");
  279. }
  280. static int redis_push(char *redisqueuename,
  281. void *buf, int num, size_t len, const char *cmd)
  282. {
  283. assert(rctx);
  284. for (int i=0; i < num; i++) {
  285. void* load = (void*)((intptr_t)buf + i*len);
  286. int rc = redisAppendCommand(rctx, "%s %s %b",
  287. cmd, redisqueuename, load, len);
  288. if (rc != REDIS_OK || rctx->err) {
  289. log_fatal("redis", "%s", rctx->errstr);
  290. }
  291. }
  292. redisReply *reply;
  293. for (int i=0; i < num; i++) {
  294. if (redisGetReply(rctx, (void**) &reply) != REDIS_OK || rctx->err) {
  295. log_fatal("redis","%s", rctx->errstr);
  296. }
  297. if (reply->type == REDIS_REPLY_ERROR) {
  298. log_fatal("redis", "%s", rctx->errstr);
  299. }
  300. freeReplyObject(reply);
  301. }
  302. return 0;
  303. }
  304. int redis_lpush(char *redisqueuename,
  305. void *buf, int num, size_t len)
  306. {
  307. return redis_push(redisqueuename, buf, num, len, "RPUSH");
  308. }
  309. int redis_spush(char *redisqueuename,
  310. void *buf, int num, size_t len)
  311. {
  312. return redis_push(redisqueuename, buf, num, len, "SADD");
  313. }
  314. static int redis_push_one(redisContext *rctx, char *queuename, void *buf, size_t len, const char *cmd)
  315. {
  316. assert(rctx);
  317. redisReply *reply = (redisReply*) redisCommand(rctx, "%s %s %b", cmd, queuename, buf, len);
  318. if (chkerr(rctx, reply)) {
  319. return -1;
  320. }
  321. freeReplyObject(reply);
  322. return 0;
  323. }
  324. int redis_lpush_one(redisContext *rctx, char *queuename,
  325. void *buf, size_t len)
  326. {
  327. return redis_push_one(rctx, queuename, buf, len, "RPUSH");
  328. }
  329. int redis_spush_one(redisContext *rctx, char *queuename,
  330. void *buf, size_t len)
  331. {
  332. return redis_push_one(rctx, queuename, buf, len, "SADD");
  333. }
  334. static int redis_push_strings(char *redisqueuename, char **buf, int num, const char *cmd)
  335. {
  336. assert(rctx);
  337. for (int i=0; i < num; i++) {
  338. int rc = redisAppendCommand(rctx, "%s %s %s", cmd, redisqueuename, buf[i]);
  339. if (rc != REDIS_OK || rctx->err) {
  340. log_fatal("redis", "%s", rctx->errstr);
  341. }
  342. }
  343. redisReply *reply;
  344. for (int i=0; i < num; i++) {
  345. if (redisGetReply(rctx, (void**) &reply) != REDIS_OK || rctx->err) {
  346. log_fatal("redis","%s", rctx->errstr);
  347. }
  348. if (reply->type == REDIS_REPLY_ERROR) {
  349. log_fatal("redis", "%s", rctx->errstr);
  350. }
  351. freeReplyObject(reply);
  352. }
  353. return 0;
  354. }
  355. int redis_lpush_strings(char *redisqueuename, char **buf, int num)
  356. {
  357. return redis_push_strings(redisqueuename, buf, num, "RPUSH");
  358. }
  359. int redis_spush_strings(char *redisqueuename, char **buf, int num)
  360. {
  361. return redis_push_strings(redisqueuename, buf, num, "SADD");
  362. }