Skip to content

Commit 6e40a04

Browse files
authored
Add LREM and LMOVE (closes #54) (#55)
* Work in progress extension for LMOVE (works) and LREM (does not) Also contains variant of queueDemo by @Enchufa2 Plus random cleanups in Redis.cpp which needed it * Correct size calculation for LPUSH, RPUSH, LREM * LREM repaired so no need for extra cleanup * Whitespace and comment cleanup * Clean example a little * Added unit tests, simplified quick check
1 parent 9397d72 commit 6e40a04

File tree

4 files changed

+276
-136
lines changed

4 files changed

+276
-136
lines changed

DESCRIPTION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: RcppRedis
22
Type: Package
33
Title: 'Rcpp' Bindings for 'Redis' using the 'hiredis' Library
4-
Version: 0.2.5
4+
Version: 0.2.5.1
55
Date: 2025-03-26
66
Authors@R: c(person("Dirk", "Eddelbuettel", role = c("aut", "cre"), email = "[email protected]",
77
comment = c(ORCID = "0000-0001-6419-907X")),

inst/demos/queueDemo.R

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
## Initial version by Iñaki Ucar using Redux
2+
## Adapted to RcppRedis by Dirk Eddelbuettel
3+
4+
useRcppRedis <- function() {
5+
6+
ensure_queue <- function(name) {
7+
##list(con = redux::hiredis(), name = name, temp = paste0(name, "_temp"))
8+
redis <- new(RcppRedis::Redis)
9+
list(con = redis, name = name, temp = paste0(name, "_temp"))
10+
}
11+
12+
publish <- function(queue, message) {
13+
##invisible(queue$con$LPUSH(queue$name, message))
14+
invisible(queue$con$lpush(queue$name, message))
15+
}
16+
17+
list_messages <- function(queue) {
18+
##list(READY = queue$con$LRANGE(queue$name, 0, -1),
19+
## PROCESSING = queue$con$LRANGE(queue$temp, 0, -1))
20+
list(READY = queue$con$lrange(queue$name, 0, -1),
21+
PROCESSING = queue$con$lrange(queue$temp, 0, -1))
22+
}
23+
24+
try_consume <- function(queue) {
25+
##message <- queue$con$RPOPLPUSH(queue$name, queue$temp)
26+
message <- queue$con$lmove(queue$name, queue$temp, 'RIGHT', 'LEFT')
27+
if (is.null(message)) return(message)
28+
list(queue = queue, message = message)
29+
}
30+
31+
ack <- function(message) {
32+
##invisible(message$queue$con$LREM(message$queue$temp, 1, message$message))
33+
invisible(message$queue$con$lrem(message$queue$temp, 1, message$message))
34+
}
35+
36+
#############################################################################
37+
38+
##system("docker run -d --rm --name valkey -p 6379:6379 valkey/valkey")
39+
## assume Redis running
40+
41+
q <- ensure_queue("jobs")
42+
##q
43+
44+
publish(q, message = "Hello world!")
45+
publish(q, message = "Hello again!")
46+
cat("\n--Messages after two enqueus\n")
47+
str(list_messages(q))
48+
49+
msg <- try_consume(q)
50+
cat("\n--Consumed message 1\n")
51+
msg$message
52+
cat("\n--Messages after consume\n")
53+
str(list_messages(q))
54+
55+
print(ack(msg))
56+
cat("\n--Messages after ack 1\n")
57+
str(list_messages(q))
58+
59+
msg2 <- try_consume(q)
60+
cat("\n--Consumed message 2\n")
61+
msg2$message
62+
print(ack(msg2))
63+
cat("\n--Messages after ack 2\n")
64+
str(list_messages(q))
65+
66+
cat("\n--Final try consume\n")
67+
str(try_consume(q))
68+
69+
##system("docker stop valkey")
70+
}
71+
72+
useRedux <- function() {
73+
ensure_queue <- function(name) {
74+
list(con = redux::hiredis(), name = name, temp = paste0(name, "_temp"))
75+
}
76+
77+
publish <- function(queue, message) {
78+
invisible(queue$con$LPUSH(queue$name, message))
79+
}
80+
81+
list_messages <- function(queue) {
82+
list(READY = queue$con$LRANGE(queue$name, 0, -1),
83+
PROCESSING = queue$con$LRANGE(queue$temp, 0, -1))
84+
}
85+
86+
try_consume <- function(queue) {
87+
message <- queue$con$RPOPLPUSH(queue$name, queue$temp)
88+
if (is.null(message)) return(message)
89+
list(queue = queue, message = message)
90+
}
91+
92+
ack <- function(message) {
93+
invisible(message$queue$con$LREM(message$queue$temp, 1, message$message))
94+
}
95+
96+
#############################################################################
97+
98+
## #system("docker run -d --rm --name valkey -p 6379:6379 valkey/valkey")
99+
100+
q <- ensure_queue("jobs")
101+
##q
102+
103+
publish(q, message = "Hello world!")
104+
publish(q, message = "Hello again!")
105+
cat("\n--Messages after two enqueus\n")
106+
str(list_messages(q))
107+
108+
msg <- try_consume(q)
109+
cat("\n--Consumed message 1\n")
110+
msg$message
111+
cat("\n--Messages after consume\n")
112+
str(list_messages(q))
113+
114+
print(ack(msg))
115+
cat("\n--Messages after ack 1\n")
116+
str(list_messages(q))
117+
118+
msg2 <- try_consume(q)
119+
cat("\n--Consumed message 2\n")
120+
msg2$message
121+
print(ack(msg2))
122+
cat("\n--Messages after ack 2\n")
123+
str(list_messages(q))
124+
125+
cat("\n--Final try consume\n")
126+
str(try_consume(q))
127+
128+
##system("docker stop valkey")
129+
130+
}
131+
132+
quickCheck <- function() {
133+
redis <- new(RcppRedis::Redis)
134+
redis$lpush("foo", "banana")
135+
redis$lpush("foo", "banana")
136+
#print(str(redis$lrange("foo", 0, -1)))
137+
print(redis$llen("foo"))
138+
redis$lrem("foo", 1, "banana")
139+
#print(str(redis$lrange("foo", 0, -1)))
140+
print(redis$llen("foo"))
141+
redis$lpop("foo")
142+
#print(str(redis$lrange("foo", 0, -1)))
143+
print(redis$llen("foo"))
144+
invisible(NULL)
145+
}
146+
147+
#useRcppRedis()
148+
#useRedux()
149+
quickCheck()

inst/tinytest/test_list.R

+33
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,36 @@ redis$exec(paste("del", key))
6767
## delete key
6868
n <- redis$exec(paste("del", key))
6969
expect_equal(n, 0)
70+
71+
72+
## check lrem
73+
redis$exec(paste("del", key))
74+
elem <- "abc"
75+
redis$lpush(key, elem)
76+
redis$lpush(key, elem)
77+
redis$lpush(key, elem)
78+
redis$lpush(key, elem)
79+
expect_equal(redis$llen(key), 4)
80+
redis$lrem(key, 1, elem)
81+
expect_equal(redis$llen(key), 3)
82+
redis$lrem(key, -1, elem)
83+
expect_equal(redis$llen(key), 2)
84+
expect_equal(redis$lpop(key), elem)
85+
expect_equal(redis$lpop(key), elem)
86+
expect_equal(redis$keys(key), character())
87+
88+
## check lmove
89+
altkey <- "RcppRedis:test:myotherlist"
90+
redis$exec(paste("del", altkey))
91+
redis$rpush(key, 1)
92+
redis$rpush(key, 2)
93+
redis$rpush(key, 3)
94+
redis$lmove(key, altkey, "LEFT", "RIGHT")
95+
expect_equal(redis$llen(key), 2)
96+
expect_equal(redis$llen(altkey), 1)
97+
redis$lmove(key, altkey, "LEFT", "LEFT")
98+
expect_equal(redis$llen(key), 1)
99+
expect_equal(redis$llen(altkey), 2)
100+
expect_equal(redis$lpop(key), 3) # as 1 and 2 have been moved (both from left)
101+
expect_equal(redis$lpop(altkey), 2) # as 2 was inserted to the left
102+
expect_equal(redis$lpop(altkey), 1) # as 1 remains

0 commit comments

Comments
 (0)