Build a Delay Queue with Redis
Contents
delay queue block diagram by YouZan
Job
type Job struct {
Id string `json:"id"`
Topic string `json:"topic"`
Delay int64 `json:"delay"`
TTR int64 `json:"ttr"` // time to run, guarantee reliability
Body string `json:"body"` // service data
}
Job Pool
Redis hash
job’s Id $ \rightarrow $ hash key
meta data of the job $ \rightarrow $ hash value
// poolAdd add the job hash
func poolAdd(jobId string, data []byte) error {
return rdb.HSet(background, PoolKey, jobId, data).Err()
}
// poolRem remove the job hash by job's Id
func poolRem(jobId string) error {
return rdb.HDel(background, PoolKey, jobId).Err()
}
// poolGetJob get job meta data by job's Id
// the meta data is transformed into Job struct with unmarshal
func poolGetJob(jobId string) *Job{
jobStr, err := rdb.HGet(background, PoolKey, jobId).Result()
if err != nil {
return nil
}
job := new(Job)
if err = json.Unmarshal([]byte(jobStr), job); err != nil {
return nil
}
return job
}
Ready Queue
Redis list
job’s Id $ \rightarrow $ list member
func queuePop(topic string) (string, error) {
key := fmt.Sprintf(QueueKeyPrefix, topic)
return rdb.RPop(background, key).Result()
}
func queueAdd(topic, jobId string) error {
queueKey := fmt.Sprintf(QueueKeyPrefix, topic)
return rdb.LPush(background, queueKey, jobId).Err()
}
Delay Bucket
Redis sorted set (zset)
job’s Id $ \rightarrow $ zset member
delay of the job $ \rightarrow $ zset score
multiple buckets will improve the polling efficiency
func bucketAdd(jobId string, jobDelay int64) error {
bucketNum := <- bucketNumChan
key := fmt.Sprintf(BucketKeyPrefix, bucketNum)
return rdb.ZAdd(background, key,
&redis.Z{Score: float64(time.Now().Unix()+jobDelay), Member: jobId}).Err()
}
func bucketRem(bucketNum int, jobId string) error {
key := fmt.Sprintf(BucketKeyPrefix, bucketNum)
return rdb.ZRem(background, key, jobId).Err()
}
func bucketGetFirst(bucketNum int) *redis.Z {
key := fmt.Sprintf(BucketKeyPrefix, bucketNum)
bucketItems := rdb.ZRangeWithScores(background, key, 0, 0).Val()
if len(bucketItems) == 0 {
return nil
}
return &bucketItems[0]
}
Timer
func initTimers() {
timers = make([]*time.Ticker, BucketSize)
for i := 0; i < BucketSize; i++ {
timers[i] = time.NewTicker(1 * time.Second)
go poolingBucket(timers[i], i)
}
}
func poolingBucket(ticker *time.Ticker, bucketNum int){
bucketKey := fmt.Sprintf(BucketKeyPrefix, bucketNum)
for {
select {
case t := <-ticker.C:
bucketItem := bucketGetFirst(bucketNum)
if bucketItem == nil{
// bucket is empty
continue
}
// does not come the time to run
if int64(bucketItem.Score) > t.Unix() {
continue
}
jobId := bucketItem.Member.(string)
job := poolGetJob(jobId)
if job == nil {
// job is not found in the pool (usually is deleted)
bucketRem(bucketNum, jobId)
continue
}
if err := queueAdd(job.Topic, job.Id); err != nil {
continue
}
bucketRem(bucketNum, job.Id)
}
}
}
Delay Queue Server
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/go-basic/uuid"
"github.com/go-redis/redis/v8"
"io/ioutil"
"net/http"
"time"
)
const (
PoolKey = "dq:pool"
QueueKeyPrefix = "dq:queue:%s"
BucketSize = 5
BucketKeyPrefix = "dq:bucket:%d"
)
var rdb *redis.Client
var timers []*time.Ticker
var bucketNumChan <-chan int
var background = context.Background()
type Resp struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`
}
func Push(resp http.ResponseWriter, req *http.Request) {
body, err := ioutil.ReadAll(req.Body)
if err != nil {
resp.Write(generateFailureBody(err.Error()))
return
}
job := new(Job)
if err = json.Unmarshal(body, job); err != nil {
resp.Write(generateFailureBody(err.Error()))
return
}
if job.Topic == ""{
resp.Write(generateFailureBody("topic is empty"))
return
}
if job.Delay < 0{
job.Delay = 0
}
if job.TTR < 0{
job.TTR = 0
}
job.Id = uuid.New()
jobBytes, err := json.Marshal(job)
if err != nil {
resp.Write(generateFailureBody(err.Error()))
return
}
if err = poolAdd(job.Id, jobBytes); err != nil {
resp.Write(generateFailureBody(err.Error()))
return
}
if err = bucketAdd(job.Id, job.Delay); err != nil {
resp.Write(generateFailureBody(err.Error()))
return
}
resp.Write(generateSuccessBody("",nil))
}
func Finish(resp http.ResponseWriter, req *http.Request) {
jobId := req.URL.Query().Get("job_id")
poolRem(jobId)
resp.Write(generateSuccessBody("",nil))
}
func Pop(resp http.ResponseWriter, req *http.Request) {
topic := req.URL.Query().Get("topic")
jobId, err := queuePop(topic)
if err != nil {
resp.Write(generateFailureBody(err.Error()))
return
}
job := poolGetJob(jobId)
if job == nil {
resp.Write(generateFailureBody("no job at present"))
return
}
if job.TTR > 0{
if err = bucketAdd(job.Id, job.TTR); err != nil {
resp.Write(generateFailureBody(err.Error()))
return
}
}
resp.Write(generateSuccessBody("", job.Body))
}
func generateSuccessBody(msg string, data interface{}) []byte {
b, _ := json.Marshal(&Resp{
Code: 200,
Message: msg,
Data: data,
})
return b
}
func generateFailureBody(msg string) []byte {
b, _ := json.Marshal(&Resp{
Code: 500,
Message: msg,
})
return b
}
func initBucketNumChan() {
c := make(chan int)
go func() {
i := 1
for {
c <- i
i++
if i > BucketSize {
i = 1
}
}
}()
bucketNumChan = c
}
func main() {
// connect to redis
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
// pooling the buckets
initTimers()
initBucketNumChan()
http.HandleFunc("/push", Push)
http.HandleFunc("/pop", Pop)
http.HandleFunc("/finish", Finish)
http.ListenAndServe(":8080", nil)
}