Contents

Build a Delay Queue with Redis

delay queue block diagram by YouZan

Job

type Job struct {
	Id string `json:"id"`
	Topic string `json:"topic"`
	Delay int64 `json:"delay"`
	TTR int64 `json:"ttr"`  // time to run, guarantee reliability
	Body string `json:"body"` // service data
}

Job Pool

Redis hash
job’s Id $ \rightarrow $ hash key
meta data of the job $ \rightarrow $ hash value

// poolAdd add the job hash
func poolAdd(jobId string, data []byte) error {
	return rdb.HSet(background, PoolKey, jobId, data).Err()
}

// poolRem remove the job hash by job's Id
func poolRem(jobId string) error {
	return rdb.HDel(background, PoolKey, jobId).Err()
}

// poolGetJob get job meta data by job's Id
// the meta data is transformed into Job struct with unmarshal
func poolGetJob(jobId string) *Job{
	jobStr, err := rdb.HGet(background, PoolKey, jobId).Result()
	if err != nil {
		return nil
	}
	job := new(Job)
	if err = json.Unmarshal([]byte(jobStr), job); err != nil {
		return nil
	}
	return job
}

Ready Queue

Redis list
job’s Id $ \rightarrow $ list member

func queuePop(topic string) (string, error) {
	key := fmt.Sprintf(QueueKeyPrefix, topic)
	return rdb.RPop(background, key).Result()
}

func queueAdd(topic, jobId string) error {
	queueKey := fmt.Sprintf(QueueKeyPrefix, topic)
	return rdb.LPush(background, queueKey, jobId).Err()
}

Delay Bucket

Redis sorted set (zset)
job’s Id $ \rightarrow $ zset member
delay of the job $ \rightarrow $ zset score

multiple buckets will improve the polling efficiency

func bucketAdd(jobId string, jobDelay int64) error {
	bucketNum := <- bucketNumChan
	key := fmt.Sprintf(BucketKeyPrefix, bucketNum)
	return rdb.ZAdd(background, key,
		&redis.Z{Score: float64(time.Now().Unix()+jobDelay), Member: jobId}).Err()
}

func bucketRem(bucketNum int, jobId string) error {
	key := fmt.Sprintf(BucketKeyPrefix, bucketNum)
	return rdb.ZRem(background, key, jobId).Err()
}

func bucketGetFirst(bucketNum int) *redis.Z {
	key := fmt.Sprintf(BucketKeyPrefix, bucketNum)
	bucketItems := rdb.ZRangeWithScores(background, key, 0, 0).Val()
	if len(bucketItems) == 0 {
		return nil
	}
	return &bucketItems[0]
}

Timer

func initTimers() {
	timers = make([]*time.Ticker, BucketSize)
	for i := 0; i < BucketSize; i++ {
		timers[i] = time.NewTicker(1 * time.Second)
		go poolingBucket(timers[i], i)
	}
}

func poolingBucket(ticker *time.Ticker, bucketNum int){
	bucketKey := fmt.Sprintf(BucketKeyPrefix, bucketNum)
	for {
		select {
		case t := <-ticker.C:
			bucketItem := bucketGetFirst(bucketNum)
			if bucketItem == nil{
				// bucket is empty
				continue
			}

			// does not come the time to run
			if int64(bucketItem.Score) > t.Unix() {
				continue
			}

			jobId := bucketItem.Member.(string)
			job := poolGetJob(jobId)
			if job == nil {
				// job is not found in the pool (usually is deleted)
				bucketRem(bucketNum, jobId)
				continue
			}

			if err := queueAdd(job.Topic, job.Id); err != nil {
				continue
			}

			bucketRem(bucketNum, job.Id)
		}
	}
}

Delay Queue Server

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/go-basic/uuid"
	"github.com/go-redis/redis/v8"
	"io/ioutil"
	"net/http"
	"time"
)

const (
	PoolKey        = "dq:pool"
	QueueKeyPrefix = "dq:queue:%s"
	BucketSize     = 5
	BucketKeyPrefix  = "dq:bucket:%d"

)

var rdb *redis.Client

var timers []*time.Ticker

var bucketNumChan <-chan int

var background = context.Background()

type Resp struct {
	Code    int         `json:"code"`
	Message string      `json:"message"`
	Data    interface{} `json:"data"`
}

func Push(resp http.ResponseWriter, req *http.Request) {
	body, err := ioutil.ReadAll(req.Body)
	if err != nil {
		resp.Write(generateFailureBody(err.Error()))
		return
	}
	job := new(Job)
	if err = json.Unmarshal(body, job); err != nil {
		resp.Write(generateFailureBody(err.Error()))
		return
	}

	if job.Topic == ""{
		resp.Write(generateFailureBody("topic is empty"))
		return
	}
	if job.Delay < 0{
		job.Delay = 0
	}
	if job.TTR < 0{
		job.TTR = 0
	}

	job.Id = uuid.New()
	jobBytes, err := json.Marshal(job)
	if err != nil {
		resp.Write(generateFailureBody(err.Error()))
		return
	}

	if err = poolAdd(job.Id, jobBytes); err != nil {
		resp.Write(generateFailureBody(err.Error()))
		return
	}

	if err = bucketAdd(job.Id, job.Delay); err != nil {
		resp.Write(generateFailureBody(err.Error()))
		return
	}
	resp.Write(generateSuccessBody("",nil))
}

func Finish(resp http.ResponseWriter, req *http.Request) {
	jobId := req.URL.Query().Get("job_id")
	poolRem(jobId)
	resp.Write(generateSuccessBody("",nil))
}

func Pop(resp http.ResponseWriter, req *http.Request) {
	topic := req.URL.Query().Get("topic")

	jobId, err := queuePop(topic)
	if err != nil {
		resp.Write(generateFailureBody(err.Error()))
		return
	}

	job := poolGetJob(jobId)
	if job == nil {
		resp.Write(generateFailureBody("no job at present"))
		return
	}

	if job.TTR > 0{
		if err = bucketAdd(job.Id, job.TTR); err != nil {
			resp.Write(generateFailureBody(err.Error()))
			return
		}
	}

	resp.Write(generateSuccessBody("", job.Body))
}

func generateSuccessBody(msg string, data interface{}) []byte {
	b, _ := json.Marshal(&Resp{
		Code: 200,
		Message: msg,
		Data: data,
	})
	return b
}

func generateFailureBody(msg string) []byte {
	b, _ := json.Marshal(&Resp{
		Code: 500,
		Message: msg,
	})
	return b
}

func initBucketNumChan() {
	c := make(chan int)
	go func() {
		i := 1
		for {
			c <- i
			i++
			if i > BucketSize {
				i = 1
			}
		}
	}()
	bucketNumChan = c
}

func main() {
	// connect to redis
	rdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})

	// pooling the buckets
	initTimers()
	initBucketNumChan()

	http.HandleFunc("/push", Push)
	http.HandleFunc("/pop", Pop)
	http.HandleFunc("/finish", Finish)
	http.ListenAndServe(":8080", nil)
}