Skip to content

Commit

Permalink
Merge branch 'main' into support/lorry_api_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
xuriwuyun committed Oct 23, 2023
2 parents 954aeaa + a150515 commit 2532217
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 35 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cicd-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ jobs:
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.54.2
- name: make test ${{ matrix.ops }}
- name: make ${{ matrix.ops }}
if: ${{ ! contains(matrix.ops, '/') }}
run: |
make ${{ matrix.ops }}
- name: make ${{ matrix.ops }}
- name: make test ${{ matrix.ops }}
if: ${{ contains(matrix.ops, '/') }}
run: |
make test TEST_PACKAGES=./${{ matrix.ops }}/...
Expand Down
14 changes: 14 additions & 0 deletions .github/workflows/e2e-kbcli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ jobs:
e2e:
name: ${{ inputs.CLOUD_PROVIDER }}
needs: check
if: ${{ inputs.CLOUD_PROVIDER != 'k3s' }}
uses: apecloud/apecloud-cd/.github/workflows/kbcli-test-k8s.yml@main
with:
CLOUD_PROVIDER: "${{ inputs.CLOUD_PROVIDER }}"
Expand All @@ -100,3 +101,16 @@ jobs:
ARGS: "${{ inputs.ARGS }}"
TEST_TYPE: "${{ inputs.TEST_TYPE }}"
secrets: inherit

k3s:
needs: check
if: ${{ inputs.CLOUD_PROVIDER == 'k3s' }}
uses: apecloud/apecloud-cd/.github/workflows/kbcli-test-k3s.yml@main
with:
KB_VERSION: "${{ needs.check.outputs.release-version }}"
KB_PRE_VERSION: "${{ inputs.PRE_VERSION }}"
CLUSTER_VERSION: "${{ inputs.CLUSTER_VERSION }}"
BRANCH_NAME: "${{ inputs.BRANCH_NAME }}"
ARGS: "${{ inputs.ARGS }}"
TEST_TYPE: "${{ inputs.TEST_TYPE }}"
secrets: inherit
5 changes: 3 additions & 2 deletions .github/workflows/release-version.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ jobs:
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.54.2
- name: make test ${{ matrix.ops }}
- name: make ${{ matrix.ops }}
if: ${{ ! contains(matrix.ops, '/') }}
run: |
make ${{ matrix.ops }}
- name: make ${{ matrix.ops }}
- name: make test ${{ matrix.ops }}
if: ${{ contains(matrix.ops, '/') }}
run: |
make test TEST_PACKAGES=./${{ matrix.ops }}/...
Expand Down
4 changes: 4 additions & 0 deletions controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -139,6 +140,9 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
if re, ok := err.(intctrlutil.RequeueError); ok {
return intctrlutil.RequeueAfter(re.RequeueAfter(), reqCtx.Log, re.Reason())
}
if apierrors.IsConflict(err) {
return intctrlutil.Requeue(reqCtx.Log, err.Error())
}
return intctrlutil.RequeueWithError(err, reqCtx.Log, "")
}

Expand Down
38 changes: 11 additions & 27 deletions controllers/apps/cluster_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ const (
clusterWeight
)

// TODO: cluster plan builder can be abstracted as a common flow

// clusterTransformContext a graph.TransformContext implementation for Cluster reconciliation
type clusterTransformContext struct {
context.Context
Expand Down Expand Up @@ -233,21 +231,21 @@ func NewClusterPlanBuilder(ctx intctrlutil.RequestCtx, cli client.Client, req ct
func (c *clusterPlanBuilder) defaultWalkFuncWithLogging(vertex graph.Vertex) error {
node, ok := vertex.(*model.ObjectVertex)
err := c.defaultWalkFunc(vertex)
if err != nil {
if !ok {
c.transCtx.Logger.Error(err, "")
} else {
if node.Action == nil {
c.transCtx.Logger.Error(err, fmt.Sprintf("%T", node))
} else {
c.transCtx.Logger.Error(err, fmt.Sprintf("%s %T error", *node.Action, node.Obj))
}
}
switch {
case err == nil:
return err
case !ok:
c.transCtx.Logger.Error(err, "")
case node.Action == nil:
c.transCtx.Logger.Error(err, fmt.Sprintf("%T", node))
case apierrors.IsConflict(err):
return err
default:
c.transCtx.Logger.Error(err, fmt.Sprintf("%s %T error", *node.Action, node.Obj))
}
return err
}

// TODO: retry strategy on error
func (c *clusterPlanBuilder) defaultWalkFunc(vertex graph.Vertex) error {
node, ok := vertex.(*model.ObjectVertex)
if !ok {
Expand Down Expand Up @@ -281,7 +279,6 @@ func (c *clusterPlanBuilder) reconcileObject(node *model.ObjectVertex) error {
case model.PATCH:
patch := client.MergeFrom(node.OriObj)
if err := c.cli.Patch(c.transCtx.Context, node.Obj, patch); err != nil && !apierrors.IsNotFound(err) {
c.transCtx.Logger.Error(err, fmt.Sprintf("patch %T error", node.OriObj))
return err
}
case model.DELETE:
Expand All @@ -294,7 +291,6 @@ func (c *clusterPlanBuilder) reconcileObject(node *model.ObjectVertex) error {
// delete secondary objects
if _, ok := node.Obj.(*appsv1alpha1.Cluster); !ok {
err := intctrlutil.BackgroundDeleteObject(c.cli, c.transCtx.Context, node.Obj)
// err := c.cli.Delete(c.transCtx.Context, node.obj)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand Down Expand Up @@ -323,20 +319,8 @@ func (c *clusterPlanBuilder) reconcileCluster(node *model.ObjectVertex) error {
// cluster.meta and cluster.spec might change
case model.STATUS:
if !reflect.DeepEqual(cluster.ObjectMeta, origCluster.ObjectMeta) || !reflect.DeepEqual(cluster.Spec, origCluster.Spec) {
// TODO: we should Update instead of Patch cluster object,
// TODO: but Update failure happens too frequently as other controllers are updating cluster object too.
// TODO: use Patch here, revert to Update after refactoring done
// if err := c.cli.Update(c.ctx.Ctx, cluster); err != nil {
// tmpCluster := &appsv1alpha1.Cluster{}
// err = c.cli.Get(c.ctx.Ctx,client.ObjectKeyFromObject(origCluster), tmpCluster)
// c.ctx.Log.Error(err, fmt.Sprintf("update %T error, orig: %v, curr: %v, api-server: %v", origCluster, origCluster, cluster, tmpCluster))
// return err
// }
patch := client.MergeFrom(origCluster.DeepCopy())
if err := c.cli.Patch(c.transCtx.Context, cluster, patch); err != nil {
// log for debug
// TODO:(free6om) make error message smaller when refactor done.
c.transCtx.Logger.Error(err, fmt.Sprintf("patch %T error, orig: %v, curr: %v", origCluster, origCluster, cluster))
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/workloads/replicatedstatemachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -74,6 +75,9 @@ func (r *ReplicatedStateMachineReconciler) Reconcile(ctx context.Context, req ct
if re, ok := err.(model.RequeueError); ok {
return intctrlutil.RequeueAfter(re.RequeueAfter(), reqCtx.Log, re.Reason())
}
if apierrors.IsConflict(err) {
return intctrlutil.Requeue(reqCtx.Log, err.Error())
}
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}

Expand Down
4 changes: 2 additions & 2 deletions lorry/component/postgres/officalpostgres/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestGetMemberAddrs(t *testing.T) {
ctx := context.TODO()
manager, mock, _ := MockDatabase(t)
defer mock.Close()
cluster := &dcs.Cluster{}
cluster := &dcs.Cluster{Namespace: "default"}

t.Run("get empty addrs", func(t *testing.T) {
addrs := manager.GetMemberAddrs(ctx, cluster)
Expand All @@ -182,7 +182,7 @@ func TestGetMemberAddrs(t *testing.T) {
addrs := manager.GetMemberAddrs(ctx, cluster)

assert.Equal(t, 1, len(addrs))
assert.Equal(t, "test.pg-headless:5432", addrs[0])
assert.Equal(t, "test.pg-headless.default.svc.cluster.local:5432", addrs[0])
})
}

Expand Down
6 changes: 5 additions & 1 deletion lorry/dcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ package dcs

import (
"fmt"

"github.com/apecloud/kubeblocks/pkg/constant"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

type Cluster struct {
Expand Down Expand Up @@ -91,7 +94,8 @@ func (c *Cluster) GetMemberAddrWithPort(member Member) string {
}

func (c *Cluster) GetMemberAddr(member Member) string {
return fmt.Sprintf("%s.%s-headless", member.Name, c.ClusterCompName)
clusterDomain := viper.GetString(constant.KubernetesClusterDomainEnv)
return fmt.Sprintf("%s.%s-headless.%s.svc.%s", member.Name, c.ClusterCompName, c.Namespace, clusterDomain)
}

func (c *Cluster) GetMemberAddrWithName(name string) string {
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/rsm/plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (b *PlanBuilder) rsmWalkFunc(v graph.Vertex) error {
case model.UPDATE:
err := b.cli.Update(b.transCtx.Context, vertex.Obj)
if err != nil && !apierrors.IsNotFound(err) {
b.transCtx.Logger.Error(err, fmt.Sprintf("update %T error: %s", vertex.Obj, vertex.OriObj.GetName()))
return err
}
case model.DELETE:
Expand Down

0 comments on commit 2532217

Please sign in to comment.