Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add sink clickhouse #619

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

zhu733756
Copy link
Contributor

@zhu733756 zhu733756 commented Aug 25, 2023

Proposed Changes:

  • Add Sink Clickhouse

Which issue(s) this PR fixes:

Fixes #494

Additional documentation:

How to run it with source kubeEvents:

1.  Exec ddl on your clickhouse server
---
CREATE DATABASE events;
CREATE TABLE events.events (
    namespace String,
    name String,
    selfLink String,
    uid String,
    resourceVersion String,
    creationTimestamp String,
    managedFields String,
    kind String,
    apiVersion String,
    fieldPath String,
    reason String,
    message String,
    host String,
    firstTimestamp String,
    lastTimestamp String,
    count Float64,
    type String,
    eventTime String,
    reportingComponent String,
    reportingInstance String
) ENGINE = MergeTree() ORDER BY (namespace, name, creationTimestamp)
---
2. git clone https://github.com/zhu733756/loggie
3. add vscode launch.json (cd loggie && mkdir -p .vscode)
---
{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "loggie-clickhouse-sink",
            "type": "go",
            "request": "launch",
            "mode": "auto",
            "program": "${workspaceFolder}/cmd/loggie/main.go",
            "args": ["--config.system", "/root/go/src/github.com/loggie/.vscode/loggie.yml", "--config.pipeline", "/root/go/src/github.com/loggie/.vscode/pipeline.yml", "--meta.nodeName", "nodeX", "--log.level","debug"]
        }
    ]
}
---
4.loggie.yml(default)
---
loggie:
  reload:
    enabled: true
    period: 10s

  monitor:
    logger:
      period: 30s
      enabled: true
    listeners:
      filesource: ~
      filewatcher: ~
      reload: ~
      sink: ~
      queue: ~
      pipeline: ~
      sys: ~

  discovery:
    enabled: false
    kubernetes:
      containerRuntime: containerd
      kubeconfig: "/root/.kube/config"
      typePodFields:
        namespace: "${_k8s.pod.namespace}"
        podname: "${_k8s.pod.name}"
        containername: "${_k8s.pod.container.name}"
        nodename: "${_k8s.node.name}"
        logconfig: "${_k8s.logconfig}"

  defaults:
    sink:
      type: dev
    sources:
      - type: file
        timestampKey: "@timestamp"
        bodyKey: "message"
        fieldsUnderRoot: true
        addonMeta: true
        addonMetaSchema:
          underRoot: true
          fields:
            filename: "${_meta.filename}"
            line: "${_meta.line}"
        watcher:
          maxOpenFds: 6000
  http:
    enabled: true
---
5. pipeline.yml
---
pipelines:
- name: clickhouse
  sources:
  - type: kubeEvent
    name: events
    kubeconfig: "/root/.kube/config"
  queue:
    type: channel
    batchSize: 2048000 
    batchBytes: 33554432000 
    batchAggTimeout: 10s
  interceptors:
  - type: transformer
    name: jsonDecode
    actions:
    - if: exist(body)
      then:
      - action: jsonDecode(body)
  - type: transformer
    name: underRoot
    actions:
    - if: exist(metadata)
      then:
      - action: underRoot(metadata)
    - if: exist(involvedObject)
      then:
      - action: underRoot(involvedObject)
    - if: exist(source)
      then:
      - action: underRoot(source)
  - type: transformer
    name: setField
    actions:
    - action: set(managedFields, "")
  - type: transformer
    name: setEmpty
    actions:
    - if: NOT exist(host)
      then:
      - action: set(host, "")
    - if: NOT exist(fieldPath)
      then:
      - action: set(fieldPath, "")
    - if: NOT exist(apiVersion)
      then:
      - action: set(apiVersion, "")
    - if: NOT exist(count)
      then:
        - action: set(count, 0)
        - action: strconv(count, float)
        - action: return()
      else:
        - action: return()
  - type: maxbytes
    maxBytes: 1048576
  - type: rateLimit
    qps: 10000
  sink:
    type: clickhouse
    addr: ["xx.xx.xx.xx:9000"]
    user: "defaut"
    password: "defaut"
    database: "events"
    table: "events"
    debug: true
---
6. Click Vscode Debug button would run it.

@zhu733756 zhu733756 force-pushed the feat-sink-clickhouse branch from 9e7f99e to 53e4887 Compare August 25, 2023 04:22
Signed-off-by: zhuhan <[email protected]>
Signed-off-by: zhuhan <[email protected]>
@zhu733756
Copy link
Contributor Author

/cc @ethfoo

pkg/include/core.go Outdated Show resolved Hide resolved
Signed-off-by: zhuhan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

什么时候支持sink到clickhouse
2 participants