Skip to content

Commit

Permalink
Merge pull request #207 from brokercap/v2.0.2
Browse files Browse the repository at this point in the history
v2.0.2
  • Loading branch information
jc3wish authored Oct 27, 2022
2 parents cc508a5 + 7ff900a commit 8fc1d06
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 131 deletions.
2 changes: 1 addition & 1 deletion config/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ limitations under the License.

package config

const VERSION = "v2.0.1-beta"
const VERSION = "v2.0.2-beta"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/smartystreets/goconvey v1.7.2 // indirect
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94
github.com/syndtr/goleveldb v1.0.0
github.com/xdg/scram v1.0.5 // indirect
golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a
golang.org/x/sys v0.0.0-20210112080510-489259a85091
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
Expand Down
4 changes: 3 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFd
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v0.18.0 h1:d5Of7+Zw4ANFOJB+TIn2K3QWsgS2Ht7OU9DqZHI6qu8=
Expand Down
10 changes: 9 additions & 1 deletion input/kafka/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
Expand All @@ -21,9 +22,14 @@ import (
)

// kafka 数据出来之后调用,再进行调用子类的callback函数进行数据解析

func (c *InputKafka) ToChildCallback(kafkaMsg *sarama.ConsumerMessage) {
if c.childCallBack != nil {
c.err = c.childCallBack(kafkaMsg)
func() {
c.Lock()
defer c.Unlock()
c.err = c.childCallBack(kafkaMsg)
}()
// 假如设置了跳过序列化操作,则直接进行跳过
if c.err != nil {
if c.config != nil && c.config.SkipSerializeErr == true {
Expand All @@ -39,13 +45,15 @@ func (c *InputKafka) ToChildCallback(kafkaMsg *sarama.ConsumerMessage) {
}

// 由子类 对kafkaMsg 进行解析完后,再进行回调到此函数

func (c *InputKafka) ToInputCallback(data *outputDriver.PluginDataType) {
c.callback(data)
commitEventData := c.BuildCommitEventAndCallback(data)
c.callback(commitEventData)
}

// 为每一行数据生成一个commit event 事件

func (c *InputKafka) BuildCommitEventAndCallback(data *outputDriver.PluginDataType) *outputDriver.PluginDataType {
newData := &outputDriver.PluginDataType{
Timestamp: data.Timestamp,
Expand Down
9 changes: 9 additions & 0 deletions input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SaslConfig struct {
SaslMechanism string `json:"net.sasl.mechanism"`
SaslUser string `json:"net.sasl.user"`
SaslPassword string `json:"net.sasl.password"`
SCRAMAuthzID string `json:"net.sasl.SCRAMAuthzID"`
}

type ConnectorParamConfig struct {
Expand Down Expand Up @@ -121,9 +122,17 @@ func getKafkaConnectConfig(config map[string]string) (kafkaConnectConfig *Config
}

if rawConfig.SaslConfig != nil {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = rawConfig.SaslUser
cfg.Net.SASL.Password = rawConfig.SaslPassword
cfg.Net.SASL.Mechanism = sarama.SASLMechanism(rawConfig.SaslMechanism)
cfg.Net.SASL.SCRAMAuthzID = rawConfig.SCRAMAuthzID
switch cfg.Net.SASL.Mechanism {
case sarama.SASLTypeSCRAMSHA256:
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &SCRAMClient{HashGeneratorFcn: SHA256} }
case sarama.SASLTypeSCRAMSHA512:
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &SCRAMClient{HashGeneratorFcn: SHA512} }
}
}
if rawConfig.TLSConfig != nil && rawConfig.TLSEnabled {
cfg.Net.TLS.Enable = true
Expand Down
3 changes: 3 additions & 0 deletions input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type InputKafka struct {
kafkaGroupCtx context.Context
kafkaGroupCancel context.CancelFunc

consumeClaimCtx context.Context
consumeClaimCancle context.CancelFunc

topics map[string]map[string]bool

positionMap map[string]map[int32]int64
Expand Down
22 changes: 19 additions & 3 deletions input/kafka/kafka_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
Expand All @@ -29,9 +30,7 @@ func (c *InputKafka) Cleanup(sarama.ConsumerGroupSession) error {
}

func (c *InputKafka) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ConsumeClaimCtx, ConsumeClaimCancle := context.WithCancel(c.kafkaGroupCtx)
defer ConsumeClaimCancle()
go c.ConsumePluginPosition(sess, ConsumeClaimCtx)
go c.StartConsumePluginPosition(sess)
for {
select {
case kafkaMsg := <-claim.Messages():
Expand All @@ -46,3 +45,20 @@ func (c *InputKafka) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama
}
return nil
}

func (c *InputKafka) StartConsumePluginPosition(sess sarama.ConsumerGroupSession) {
c.Lock()
if c.consumeClaimCtx != nil {
c.Unlock()
return
}
c.consumeClaimCtx, c.consumeClaimCancle = context.WithCancel(c.kafkaGroupCtx)
c.Unlock()
defer func() {
c.consumeClaimCtx = nil
c.consumeClaimCancle = nil
}()
defer c.consumeClaimCancle()

c.ConsumePluginPosition(sess, c.consumeClaimCtx)
}
37 changes: 37 additions & 0 deletions input/kafka/kafka_scram_auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kafka

import (
"crypto/sha256"
"crypto/sha512"

"github.com/xdg/scram"
)

var (
SHA256 scram.HashGeneratorFcn = sha256.New
SHA512 scram.HashGeneratorFcn = sha512.New
)

type SCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *SCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *SCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *SCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
4 changes: 2 additions & 2 deletions input/kafka/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"strings"
)

// 最上层回调 已经加锁

func (c *InputKafka) SetTopicPartitionOffsetAndReturnGTID(kafkaMsg *sarama.ConsumerMessage) (GTID string) {
c.Lock()
defer c.Unlock()
if kafkaMsg != nil {
var ok bool
if _, ok = c.positionMap[kafkaMsg.Topic]; !ok {
Expand Down
52 changes: 26 additions & 26 deletions input/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,37 @@ var MySQLBinlogDump string

type MysqlInput struct {
inputDriver.PluginDriverInterface
inputInfo inputDriver.InputInfo
binlogDump *mysqlDriver.BinlogDump
reslut chan error
status inputDriver.StatusFlag
err error
inputInfo inputDriver.InputInfo
binlogDump *mysqlDriver.BinlogDump
reslut chan error
status inputDriver.StatusFlag
err error
PluginStatusChan chan *inputDriver.PluginStatus
eventID uint64
callback inputDriver.Callback
eventID uint64
callback inputDriver.Callback
}

func NewInputPlugin () inputDriver.Driver {
func NewInputPlugin() inputDriver.Driver {
return &MysqlInput{}
}

func (c *MysqlInput) GetUriExample() (string,string) {
func (c *MysqlInput) GetUriExample() (string, string) {
notesHtml := `
<p><span class="help-block m-b-none">授权权限例子:GRANT SELECT, SHOW DATABASES, SUPER, REPLICATION SLAVE, EVENT ON *.* TO 'xxtest'@'%'</span></p>
<p><span class="help-block m-b-none">RDS云产品数据库云产品权限,可能不是按 MySQL 开源权限来的,请自行确认是否有足够权限(不要勾选 验证是否有足够权限 选项)</span></p>
<p><span class="help-block m-b-none">自确认权限:kill 当前帐号的连接, SET 命令权限,SHOW EVENT 权限 等</span></p>
<p><span class="help-block m-b-none">需要SQL权限细节,请参考 <a href="/docs" target="_blank">DOC文档</a></span></p>
`
return "root:root@tcp(127.0.0.1:3306)/test",notesHtml
return "root:root@tcp(127.0.0.1:3306)/test", notesHtml
}

func (c *MysqlInput) SetOption(inputInfo inputDriver.InputInfo,param map[string]interface{}) {
func (c *MysqlInput) SetOption(inputInfo inputDriver.InputInfo, param map[string]interface{}) {
c.inputInfo = inputInfo
}

func (c *MysqlInput) Start(ch chan *inputDriver.PluginStatus) error {
switch c.status {
case inputDriver.STOPPING,inputDriver.STOPPED:
case inputDriver.STOPPING, inputDriver.STOPPED:
return c.Start1()
default:
c.PluginStatusChan = ch
Expand All @@ -50,7 +50,7 @@ func (c *MysqlInput) Start(ch chan *inputDriver.PluginStatus) error {
}

func (c *MysqlInput) Start0() error {
c.reslut = make(chan error,1)
c.reslut = make(chan error, 1)
c.binlogDump = mysqlDriver.NewBinlogDump(
c.inputInfo.ConnectUri,
c.MySQLCallback,
Expand All @@ -61,12 +61,12 @@ func (c *MysqlInput) Start0() error {
mysqlDriver.WRITE_ROWS_EVENTv1, mysqlDriver.UPDATE_ROWS_EVENTv1, mysqlDriver.DELETE_ROWS_EVENTv1,
mysqlDriver.WRITE_ROWS_EVENTv0, mysqlDriver.UPDATE_ROWS_EVENTv0, mysqlDriver.DELETE_ROWS_EVENTv0,
},
nil,nil)
nil, nil)
c.binlogDump.SetNextEventID(c.eventID)
if c.inputInfo.IsGTID && c.inputInfo.GTID == "" {
if !c.inputInfo.IsGTID || c.inputInfo.GTID == "" {
go c.binlogDump.StartDumpBinlog(c.inputInfo.BinlogFileName, c.inputInfo.BinlogPostion, c.inputInfo.ServerId, c.reslut, c.inputInfo.MaxFileName, c.inputInfo.MaxPosition)
}else{
log.Println("c.inputInfo.GTID:",c.inputInfo.GTID," c.inputInfo.ServerId:",c.inputInfo.ServerId)
} else {
log.Println("c.inputInfo.GTID:", c.inputInfo.GTID, " c.inputInfo.ServerId:", c.inputInfo.ServerId)
go c.binlogDump.StartDumpBinlogGtid(c.inputInfo.GTID, c.inputInfo.ServerId, c.reslut)
}
go c.monitorDump()
Expand All @@ -86,7 +86,7 @@ func (c *MysqlInput) monitorDump() (r bool) {
}()
for {
select {
case v := <- c.reslut:
case v := <-c.reslut:
if v == nil {
return
}
Expand All @@ -112,7 +112,7 @@ func (c *MysqlInput) monitorDump() (r bool) {
}
break
}
c.PluginStatusChan <- &inputDriver.PluginStatus{Status:c.status , Error: c.err}
c.PluginStatusChan <- &inputDriver.PluginStatus{Status: c.status, Error: c.err}

}
return true
Expand All @@ -134,16 +134,16 @@ func (c *MysqlInput) Kill() error {
}

func (c *MysqlInput) GetLastPosition() *inputDriver.PluginPosition {
FileName,Position,Timestamp,GTID,LastEventID := c.binlogDump.GetBinlog()
FileName, Position, Timestamp, GTID, LastEventID := c.binlogDump.GetBinlog()
if FileName == "" {
return nil
}
return &inputDriver.PluginPosition{
GTID:GTID,
BinlogFileName:FileName,
BinlogPostion:Position,
Timestamp:Timestamp,
EventID:LastEventID,
GTID: GTID,
BinlogFileName: FileName,
BinlogPostion: Position,
Timestamp: Timestamp,
EventID: LastEventID,
}
}

Expand All @@ -154,4 +154,4 @@ func (c *MysqlInput) SetEventID(eventId uint64) error {

func (c *MysqlInput) SetCallback(callback inputDriver.Callback) {
c.callback = callback
}
}
Loading

0 comments on commit 8fc1d06

Please sign in to comment.