forked from kevadesu/forgejo
Merge branch 'forgejo' into forgejo-federated-star
This commit is contained in:
commit
4c87b0b3ee
146 changed files with 11442 additions and 1311 deletions
|
@ -27,6 +27,7 @@ type GrepResult struct {
|
|||
type GrepOptions struct {
|
||||
RefName string
|
||||
MaxResultLimit int
|
||||
MatchesPerFile int
|
||||
ContextLineNumber int
|
||||
IsFuzzy bool
|
||||
PathSpec []setting.Glob
|
||||
|
@ -54,6 +55,9 @@ func GrepSearch(ctx context.Context, repo *Repository, search string, opts GrepO
|
|||
var results []*GrepResult
|
||||
cmd := NewCommand(ctx, "grep", "--null", "--break", "--heading", "--fixed-strings", "--line-number", "--ignore-case", "--full-name")
|
||||
cmd.AddOptionValues("--context", fmt.Sprint(opts.ContextLineNumber))
|
||||
if opts.MatchesPerFile > 0 {
|
||||
cmd.AddOptionValues("--max-count", fmt.Sprint(opts.MatchesPerFile))
|
||||
}
|
||||
if opts.IsFuzzy {
|
||||
words := strings.Fields(search)
|
||||
for _, word := range words {
|
||||
|
|
|
@ -44,6 +44,31 @@ func TestGrepSearch(t *testing.T) {
|
|||
},
|
||||
}, res)
|
||||
|
||||
res, err = GrepSearch(context.Background(), repo, "world", GrepOptions{MatchesPerFile: 1})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []*GrepResult{
|
||||
{
|
||||
Filename: "i-am-a-python.p",
|
||||
LineNumbers: []int{1},
|
||||
LineCodes: []string{"## This is a simple file to do a hello world"},
|
||||
},
|
||||
{
|
||||
Filename: "java-hello/main.java",
|
||||
LineNumbers: []int{1},
|
||||
LineCodes: []string{"public class HelloWorld"},
|
||||
},
|
||||
{
|
||||
Filename: "main.vendor.java",
|
||||
LineNumbers: []int{1},
|
||||
LineCodes: []string{"public class HelloWorld"},
|
||||
},
|
||||
{
|
||||
Filename: "python-hello/hello.py",
|
||||
LineNumbers: []int{1},
|
||||
LineCodes: []string{"## This is a simple file to do a hello world"},
|
||||
},
|
||||
}, res)
|
||||
|
||||
res, err = GrepSearch(context.Background(), repo, "no-such-content", GrepOptions{})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res, 0)
|
||||
|
|
|
@ -19,14 +19,34 @@ type baseRedis struct {
|
|||
client redis.UniversalClient
|
||||
isUnique bool
|
||||
cfg *BaseConfig
|
||||
prefix string
|
||||
|
||||
mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together
|
||||
}
|
||||
|
||||
var _ baseQueue = (*baseRedis)(nil)
|
||||
|
||||
func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
||||
client := nosql.GetManager().GetRedisClient(cfg.ConnStr)
|
||||
func newBaseRedisGeneric(cfg *BaseConfig, unique bool, client redis.UniversalClient) (baseQueue, error) {
|
||||
if client == nil {
|
||||
client = nosql.GetManager().GetRedisClient(cfg.ConnStr)
|
||||
}
|
||||
|
||||
prefix := ""
|
||||
uri := nosql.ToRedisURI(cfg.ConnStr)
|
||||
|
||||
for key, value := range uri.Query() {
|
||||
switch key {
|
||||
case "prefix":
|
||||
if len(value) > 0 {
|
||||
prefix = value[0]
|
||||
|
||||
// As we are not checking any other values, if we found this one, we can
|
||||
// exit from the loop.
|
||||
// If a new key check is required, remove this break.
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
for i := 0; i < 10; i++ {
|
||||
|
@ -41,15 +61,19 @@ func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return &baseRedis{cfg: cfg, client: client, isUnique: unique}, nil
|
||||
return &baseRedis{cfg: cfg, client: client, isUnique: unique, prefix: prefix}, nil
|
||||
}
|
||||
|
||||
func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseRedisGeneric(cfg, false)
|
||||
return newBaseRedisGeneric(cfg, false, nil)
|
||||
}
|
||||
|
||||
func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseRedisGeneric(cfg, true)
|
||||
return newBaseRedisGeneric(cfg, true, nil)
|
||||
}
|
||||
|
||||
func (q *baseRedis) prefixedName(name string) string {
|
||||
return q.prefix + name
|
||||
}
|
||||
|
||||
func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
|
||||
|
@ -57,7 +81,7 @@ func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
|
|||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
|
||||
cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -66,7 +90,7 @@ func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
|
|||
}
|
||||
|
||||
if q.isUnique {
|
||||
added, err := q.client.SAdd(ctx, q.cfg.SetFullName, data).Result()
|
||||
added, err := q.client.SAdd(ctx, q.prefixedName(q.cfg.SetFullName), data).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -74,7 +98,7 @@ func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
|
|||
return false, ErrAlreadyInQueue
|
||||
}
|
||||
}
|
||||
return false, q.client.RPush(ctx, q.cfg.QueueFullName, data).Err()
|
||||
return false, q.client.RPush(ctx, q.prefixedName(q.cfg.QueueFullName), data).Err()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -83,7 +107,7 @@ func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) {
|
|||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
data, err = q.client.LPop(ctx, q.cfg.QueueFullName).Bytes()
|
||||
data, err = q.client.LPop(ctx, q.prefixedName(q.cfg.QueueFullName)).Bytes()
|
||||
if err == redis.Nil {
|
||||
return true, nil, nil
|
||||
}
|
||||
|
@ -92,7 +116,7 @@ func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) {
|
|||
}
|
||||
if q.isUnique {
|
||||
// the data has been popped, even if there is any error we can't do anything
|
||||
_ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err()
|
||||
_ = q.client.SRem(ctx, q.prefixedName(q.cfg.SetFullName), data).Err()
|
||||
}
|
||||
return false, data, err
|
||||
})
|
||||
|
@ -104,13 +128,13 @@ func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) {
|
|||
if !q.isUnique {
|
||||
return false, nil
|
||||
}
|
||||
return q.client.SIsMember(ctx, q.cfg.SetFullName, data).Result()
|
||||
return q.client.SIsMember(ctx, q.prefixedName(q.cfg.SetFullName), data).Result()
|
||||
}
|
||||
|
||||
func (q *baseRedis) Len(ctx context.Context) (int, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result()
|
||||
cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result()
|
||||
return int(cnt), err
|
||||
}
|
||||
|
||||
|
@ -124,10 +148,10 @@ func (q *baseRedis) RemoveAll(ctx context.Context) error {
|
|||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
c1 := q.client.Del(ctx, q.cfg.QueueFullName)
|
||||
c1 := q.client.Del(ctx, q.prefixedName(q.cfg.QueueFullName))
|
||||
// the "set" must be cleared after the "list" because there is no transaction.
|
||||
// it's better to have duplicate items than losing items.
|
||||
c2 := q.client.Del(ctx, q.cfg.SetFullName)
|
||||
c2 := q.client.Del(ctx, q.prefixedName(q.cfg.SetFullName))
|
||||
if c1.Err() != nil {
|
||||
return c1.Err()
|
||||
}
|
||||
|
|
|
@ -5,67 +5,134 @@ package queue
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/nosql"
|
||||
"code.gitea.io/gitea/modules/queue/mock"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
func waitRedisReady(conn string, dur time.Duration) (ready bool) {
|
||||
ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
for t := time.Now(); ; time.Sleep(50 * time.Millisecond) {
|
||||
ret := nosql.GetManager().GetRedisClient(conn).Ping(ctxTimed)
|
||||
if ret.Err() == nil {
|
||||
return true
|
||||
}
|
||||
if time.Since(t) > dur {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
type baseRedisUnitTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
func redisServerCmd(t *testing.T) *exec.Cmd {
|
||||
redisServerProg, err := exec.LookPath("redis-server")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
c := &exec.Cmd{
|
||||
Path: redisServerProg,
|
||||
Args: []string{redisServerProg, "--bind", "127.0.0.1", "--port", "6379"},
|
||||
Dir: t.TempDir(),
|
||||
Stdin: os.Stdin,
|
||||
Stdout: os.Stdout,
|
||||
Stderr: os.Stderr,
|
||||
}
|
||||
return c
|
||||
mockController *gomock.Controller
|
||||
}
|
||||
|
||||
func TestBaseRedis(t *testing.T) {
|
||||
var redisServer *exec.Cmd
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
if !waitRedisReady("redis://127.0.0.1:6379/0", 0) {
|
||||
redisServer = redisServerCmd(t)
|
||||
if true {
|
||||
t.Skip("redis-server not found in Forgejo test yet")
|
||||
return
|
||||
}
|
||||
assert.NoError(t, redisServer.Start())
|
||||
if !assert.True(t, waitRedisReady("redis://127.0.0.1:6379/0", 5*time.Second), "start redis-server") {
|
||||
return
|
||||
}
|
||||
suite.Run(t, &baseRedisUnitTestSuite{})
|
||||
}
|
||||
|
||||
func (suite *baseRedisUnitTestSuite) SetupSuite() {
|
||||
suite.mockController = gomock.NewController(suite.T())
|
||||
}
|
||||
|
||||
func (suite *baseRedisUnitTestSuite) TestBasic() {
|
||||
queueName := "test-queue"
|
||||
testCases := []struct {
|
||||
Name string
|
||||
ConnectionString string
|
||||
QueueName string
|
||||
Unique bool
|
||||
}{
|
||||
{
|
||||
Name: "unique",
|
||||
ConnectionString: "redis://127.0.0.1/0",
|
||||
QueueName: queueName,
|
||||
Unique: true,
|
||||
},
|
||||
{
|
||||
Name: "non-unique",
|
||||
ConnectionString: "redis://127.0.0.1/0",
|
||||
QueueName: queueName,
|
||||
Unique: false,
|
||||
},
|
||||
{
|
||||
Name: "unique with prefix",
|
||||
ConnectionString: "redis://127.0.0.1/0?prefix=forgejo:queue:",
|
||||
QueueName: "forgejo:queue:" + queueName,
|
||||
Unique: true,
|
||||
},
|
||||
{
|
||||
Name: "non-unique with prefix",
|
||||
ConnectionString: "redis://127.0.0.1/0?prefix=forgejo:queue:",
|
||||
QueueName: "forgejo:queue:" + queueName,
|
||||
Unique: false,
|
||||
},
|
||||
}
|
||||
|
||||
testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", setting.QueueSettings{Length: 10}), false)
|
||||
testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", setting.QueueSettings{Length: 10}), true)
|
||||
for _, testCase := range testCases {
|
||||
suite.Run(testCase.Name, func() {
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: testCase.ConnectionString,
|
||||
}
|
||||
|
||||
// Configure expectations.
|
||||
mockRedisStore := mock.NewInMemoryMockRedis()
|
||||
redisClient := mock.NewMockUniversalClient(suite.mockController)
|
||||
|
||||
redisClient.EXPECT().
|
||||
Ping(gomock.Any()).
|
||||
Times(1).
|
||||
Return(&redis.StatusCmd{})
|
||||
redisClient.EXPECT().
|
||||
LLen(gomock.Any(), testCase.QueueName).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.LLen)
|
||||
redisClient.EXPECT().
|
||||
LPop(gomock.Any(), testCase.QueueName).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.LPop)
|
||||
redisClient.EXPECT().
|
||||
RPush(gomock.Any(), testCase.QueueName, gomock.Any()).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.RPush)
|
||||
|
||||
if testCase.Unique {
|
||||
redisClient.EXPECT().
|
||||
SAdd(gomock.Any(), testCase.QueueName+"_unique", gomock.Any()).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.SAdd)
|
||||
redisClient.EXPECT().
|
||||
SRem(gomock.Any(), testCase.QueueName+"_unique", gomock.Any()).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.SRem)
|
||||
redisClient.EXPECT().
|
||||
SIsMember(gomock.Any(), testCase.QueueName+"_unique", gomock.Any()).
|
||||
Times(2).
|
||||
DoAndReturn(mockRedisStore.SIsMember)
|
||||
}
|
||||
|
||||
client, err := newBaseRedisGeneric(
|
||||
toBaseConfig(queueName, queueSettings),
|
||||
testCase.Unique,
|
||||
redisClient,
|
||||
)
|
||||
suite.Require().NoError(err)
|
||||
|
||||
ctx := context.Background()
|
||||
expectedContent := []byte("test")
|
||||
|
||||
suite.Require().NoError(client.PushItem(ctx, expectedContent))
|
||||
|
||||
found, err := client.HasItem(ctx, expectedContent)
|
||||
suite.Require().NoError(err)
|
||||
if testCase.Unique {
|
||||
suite.True(found)
|
||||
} else {
|
||||
suite.False(found)
|
||||
}
|
||||
|
||||
found, err = client.HasItem(ctx, []byte("not found content"))
|
||||
suite.Require().NoError(err)
|
||||
suite.False(found)
|
||||
|
||||
content, err := client.PopItem(ctx)
|
||||
suite.Require().NoError(err)
|
||||
suite.Equal(expectedContent, content)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
133
modules/queue/base_redis_with_server_test.go
Normal file
133
modules/queue/base_redis_with_server_test.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/nosql"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
const defaultTestRedisServer = "127.0.0.1:6379"
|
||||
|
||||
type baseRedisWithServerTestSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestBaseRedisWithServer(t *testing.T) {
|
||||
suite.Run(t, &baseRedisWithServerTestSuite{})
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) TestNormal() {
|
||||
redisAddress := "redis://" + suite.testRedisHost() + "/0"
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: redisAddress,
|
||||
}
|
||||
|
||||
redisServer, accessible := suite.startRedisServer(redisAddress)
|
||||
|
||||
// If it's accessible, but redisServer command is nil, that means we are using
|
||||
// an already running redis server.
|
||||
if redisServer == nil && !accessible {
|
||||
suite.T().Skip("redis-server not found in Forgejo test yet")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
testQueueBasic(suite.T(), newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false)
|
||||
testQueueBasic(suite.T(), newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true)
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) TestWithPrefix() {
|
||||
redisAddress := "redis://" + suite.testRedisHost() + "/0?prefix=forgejo:queue:"
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: redisAddress,
|
||||
}
|
||||
|
||||
redisServer, accessible := suite.startRedisServer(redisAddress)
|
||||
|
||||
// If it's accessible, but redisServer command is nil, that means we are using
|
||||
// an already running redis server.
|
||||
if redisServer == nil && !accessible {
|
||||
suite.T().Skip("redis-server not found in Forgejo test yet")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
testQueueBasic(suite.T(), newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false)
|
||||
testQueueBasic(suite.T(), newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true)
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) startRedisServer(address string) (*exec.Cmd, bool) {
|
||||
var redisServer *exec.Cmd
|
||||
|
||||
if !suite.waitRedisReady(address, 0) {
|
||||
redisServerProg, err := exec.LookPath("redis-server")
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
redisServer = &exec.Cmd{
|
||||
Path: redisServerProg,
|
||||
Args: []string{redisServerProg, "--bind", "127.0.0.1", "--port", "6379"},
|
||||
Dir: suite.T().TempDir(),
|
||||
Stdin: os.Stdin,
|
||||
Stdout: os.Stdout,
|
||||
Stderr: os.Stderr,
|
||||
}
|
||||
|
||||
suite.Require().NoError(redisServer.Start())
|
||||
|
||||
if !suite.True(suite.waitRedisReady(address, 5*time.Second), "start redis-server") {
|
||||
// Return with redis server even if it's not available. It was started,
|
||||
// even if it's not reachable for any reasons, it's still started, the
|
||||
// parent will close it.
|
||||
return redisServer, false
|
||||
}
|
||||
}
|
||||
|
||||
return redisServer, true
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) waitRedisReady(conn string, dur time.Duration) (ready bool) {
|
||||
ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
for t := time.Now(); ; time.Sleep(50 * time.Millisecond) {
|
||||
ret := nosql.GetManager().GetRedisClient(conn).Ping(ctxTimed)
|
||||
if ret.Err() == nil {
|
||||
return true
|
||||
}
|
||||
if time.Since(t) > dur {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) testRedisHost() string {
|
||||
value := os.Getenv("TEST_REDIS_SERVER")
|
||||
if value != "" {
|
||||
return value
|
||||
}
|
||||
|
||||
return defaultTestRedisServer
|
||||
}
|
133
modules/queue/mock/inmemorymockredis.go
Normal file
133
modules/queue/mock/inmemorymockredis.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
redis "github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// InMemoryMockRedis is a very primitive in-memory redis-like feature. The main
|
||||
// purpose of this struct is to give some backend to mocked unit tests.
|
||||
type InMemoryMockRedis struct {
|
||||
queues map[string][][]byte
|
||||
}
|
||||
|
||||
func NewInMemoryMockRedis() InMemoryMockRedis {
|
||||
return InMemoryMockRedis{
|
||||
queues: map[string][][]byte{},
|
||||
}
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) LLen(ctx context.Context, key string) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
cmd.SetVal(int64(len(r.queues[key])))
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) SAdd(ctx context.Context, key string, content []byte) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
|
||||
for _, value := range r.queues[key] {
|
||||
if string(value) == string(content) {
|
||||
cmd.SetVal(0)
|
||||
|
||||
return cmd
|
||||
}
|
||||
}
|
||||
|
||||
r.queues[key] = append(r.queues[key], content)
|
||||
|
||||
cmd.SetVal(1)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) RPush(ctx context.Context, key string, content []byte) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
|
||||
r.queues[key] = append(r.queues[key], content)
|
||||
|
||||
cmd.SetVal(1)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) LPop(ctx context.Context, key string) *redis.StringCmd {
|
||||
cmd := redis.NewStringCmd(ctx)
|
||||
|
||||
queue, found := r.queues[key]
|
||||
if !found {
|
||||
cmd.SetErr(errors.New("queue not found"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
if len(queue) < 1 {
|
||||
cmd.SetErr(errors.New("queue is empty"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
value, rest := queue[0], queue[1:]
|
||||
|
||||
r.queues[key] = rest
|
||||
|
||||
cmd.SetVal(string(value))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) SRem(ctx context.Context, key string, content []byte) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
|
||||
queue, found := r.queues[key]
|
||||
if !found {
|
||||
cmd.SetErr(errors.New("queue not found"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
if len(queue) < 1 {
|
||||
cmd.SetErr(errors.New("queue is empty"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
newList := [][]byte{}
|
||||
|
||||
for _, value := range queue {
|
||||
if string(value) != string(content) {
|
||||
newList = append(newList, value)
|
||||
}
|
||||
}
|
||||
|
||||
r.queues[key] = newList
|
||||
|
||||
cmd.SetVal(1)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) SIsMember(ctx context.Context, key string, content []byte) *redis.BoolCmd {
|
||||
cmd := redis.NewBoolCmd(ctx)
|
||||
|
||||
queue, found := r.queues[key]
|
||||
if !found {
|
||||
cmd.SetErr(errors.New("queue not found"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
for _, value := range queue {
|
||||
if string(value) == string(content) {
|
||||
cmd.SetVal(true)
|
||||
|
||||
return cmd
|
||||
}
|
||||
}
|
||||
|
||||
cmd.SetVal(false)
|
||||
|
||||
return cmd
|
||||
}
|
7168
modules/queue/mock/redisuniversalclient.go
Normal file
7168
modules/queue/mock/redisuniversalclient.go
Normal file
File diff suppressed because it is too large
Load diff
|
@ -192,16 +192,24 @@ func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
|
|||
<-q.shutdownDone
|
||||
}
|
||||
|
||||
func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQueue, error)) {
|
||||
func getNewQueue(t string, cfg *BaseConfig, unique bool) (string, baseQueue, error) {
|
||||
switch t {
|
||||
case "dummy", "immediate":
|
||||
return t, newBaseDummy
|
||||
queue, err := newBaseDummy(cfg, unique)
|
||||
|
||||
return t, queue, err
|
||||
case "channel":
|
||||
return t, newBaseChannelGeneric
|
||||
queue, err := newBaseChannelGeneric(cfg, unique)
|
||||
|
||||
return t, queue, err
|
||||
case "redis":
|
||||
return t, newBaseRedisGeneric
|
||||
queue, err := newBaseRedisGeneric(cfg, unique, nil)
|
||||
|
||||
return t, queue, err
|
||||
default: // level(leveldb,levelqueue,persistable-channel)
|
||||
return "level", newBaseLevelQueueGeneric
|
||||
queue, err := newBaseLevelQueueGeneric(cfg, unique)
|
||||
|
||||
return "level", queue, err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,14 +225,14 @@ func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queu
|
|||
|
||||
var w WorkerPoolQueue[T]
|
||||
var err error
|
||||
queueType, newQueueFn := getNewQueueFn(queueSetting.Type)
|
||||
w.baseQueueType = queueType
|
||||
|
||||
w.baseConfig = toBaseConfig(name, queueSetting)
|
||||
w.baseQueue, err = newQueueFn(w.baseConfig, unique)
|
||||
|
||||
w.baseQueueType, w.baseQueue, err = getNewQueue(queueSetting.Type, w.baseConfig, unique)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Trace("Created queue %q of type %q", name, queueType)
|
||||
log.Trace("Created queue %q of type %q", name, w.baseQueueType)
|
||||
|
||||
w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false)
|
||||
w.batchChan = make(chan []T)
|
||||
|
|
|
@ -124,16 +124,15 @@ func loadOAuth2From(rootCfg ConfigProvider) {
|
|||
OAuth2.Enabled = sec.Key("ENABLE").MustBool(OAuth2.Enabled)
|
||||
}
|
||||
|
||||
if !OAuth2.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
jwtSecretBase64 := loadSecret(sec, "JWT_SECRET_URI", "JWT_SECRET")
|
||||
|
||||
if !filepath.IsAbs(OAuth2.JWTSigningPrivateKeyFile) {
|
||||
OAuth2.JWTSigningPrivateKeyFile = filepath.Join(AppDataPath, OAuth2.JWTSigningPrivateKeyFile)
|
||||
}
|
||||
|
||||
// FIXME: at the moment, no matter oauth2 is enabled or not, it must generate a "oauth2 JWT_SECRET"
|
||||
// Because this secret is also used as GeneralTokenSigningSecret (as a quick not-that-breaking fix for some legacy problems).
|
||||
// Including: CSRF token, account validation token, etc ...
|
||||
// In main branch, the signing token should be refactored (eg: one unique for LFS/OAuth2/etc ...)
|
||||
jwtSecretBase64 := loadSecret(sec, "JWT_SECRET_URI", "JWT_SECRET")
|
||||
if InstallLock {
|
||||
jwtSecretBytes, err := generate.DecodeJwtSecret(jwtSecretBase64)
|
||||
if err != nil {
|
||||
|
@ -155,8 +154,6 @@ func loadOAuth2From(rootCfg ConfigProvider) {
|
|||
}
|
||||
}
|
||||
|
||||
// generalSigningSecret is used as container for a []byte value
|
||||
// instead of an additional mutex, we use CompareAndSwap func to change the value thread save
|
||||
var generalSigningSecret atomic.Pointer[[]byte]
|
||||
|
||||
func GetGeneralTokenSigningSecret() []byte {
|
||||
|
@ -164,11 +161,9 @@ func GetGeneralTokenSigningSecret() []byte {
|
|||
if old == nil || len(*old) == 0 {
|
||||
jwtSecret, _, err := generate.NewJwtSecret()
|
||||
if err != nil {
|
||||
log.Fatal("Unable to generate general JWT secret: %s", err.Error())
|
||||
log.Fatal("Unable to generate general JWT secret: %v", err)
|
||||
}
|
||||
if generalSigningSecret.CompareAndSwap(old, &jwtSecret) {
|
||||
// FIXME: in main branch, the signing token should be refactored (eg: one unique for LFS/OAuth2/etc ...)
|
||||
log.Warn("OAuth2 is not enabled, unable to use a persistent signing secret, a new one is generated, which is not persistent between restarts and cluster nodes")
|
||||
return jwtSecret
|
||||
}
|
||||
return *generalSigningSecret.Load()
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
package setting
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"code.gitea.io/gitea/modules/generate"
|
||||
|
@ -14,7 +15,7 @@ import (
|
|||
|
||||
func TestGetGeneralSigningSecret(t *testing.T) {
|
||||
// when there is no general signing secret, it should be generated, and keep the same value
|
||||
assert.Nil(t, generalSigningSecret.Load())
|
||||
generalSigningSecret.Store(nil)
|
||||
s1 := GetGeneralTokenSigningSecret()
|
||||
assert.NotNil(t, s1)
|
||||
s2 := GetGeneralTokenSigningSecret()
|
||||
|
@ -32,3 +33,28 @@ JWT_SECRET = BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
|
|||
assert.Len(t, actual, 32)
|
||||
assert.EqualValues(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestGetGeneralSigningSecretSave(t *testing.T) {
|
||||
defer test.MockVariableValue(&InstallLock, true)()
|
||||
|
||||
old := GetGeneralTokenSigningSecret()
|
||||
assert.Len(t, old, 32)
|
||||
|
||||
tmpFile := t.TempDir() + "/app.ini"
|
||||
_ = os.WriteFile(tmpFile, nil, 0o644)
|
||||
cfg, _ := NewConfigProviderFromFile(tmpFile)
|
||||
loadOAuth2From(cfg)
|
||||
generated := GetGeneralTokenSigningSecret()
|
||||
assert.Len(t, generated, 32)
|
||||
assert.NotEqual(t, old, generated)
|
||||
|
||||
generalSigningSecret.Store(nil)
|
||||
cfg, _ = NewConfigProviderFromFile(tmpFile)
|
||||
loadOAuth2From(cfg)
|
||||
again := GetGeneralTokenSigningSecret()
|
||||
assert.Equal(t, generated, again)
|
||||
|
||||
iniContent, err := os.ReadFile(tmpFile)
|
||||
assert.NoError(t, err)
|
||||
assert.Contains(t, string(iniContent), "JWT_SECRET = ")
|
||||
}
|
||||
|
|
|
@ -57,8 +57,9 @@ type DeleteLabelsOption struct {
|
|||
|
||||
// IssueLabelsOption a collection of labels
|
||||
type IssueLabelsOption struct {
|
||||
// list of label IDs
|
||||
Labels []int64 `json:"labels"`
|
||||
// Labels can be a list of integers representing label IDs
|
||||
// or a list of strings representing label names
|
||||
Labels []any `json:"labels"`
|
||||
// swagger:strfmt date-time
|
||||
Updated *time.Time `json:"updated_at"`
|
||||
}
|
||||
|
|
|
@ -230,7 +230,7 @@ type EditRepoOption struct {
|
|||
Archived *bool `json:"archived,omitempty"`
|
||||
// set to a string like `8h30m0s` to set the mirror interval time
|
||||
MirrorInterval *string `json:"mirror_interval,omitempty"`
|
||||
// enable prune - remove obsolete remote-tracking references
|
||||
// enable prune - remove obsolete remote-tracking references when mirroring
|
||||
EnablePrune *bool `json:"enable_prune,omitempty"`
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue