Skip to content

Testing the `http` sink

neuronull edited this page Aug 17, 2023 · 2 revisions

Setup

Method 1

git clone https://github.com/vectordotdev/http_test_server.git
cd http_test_server
go build
./http_test_server

Method 2

You can setup a local python HTTP server e.g.

from http.server import BaseHTTPRequestHandler, HTTPServer

class MyHTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write(b'Hello, this is a GET response.')

    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        response_message = f'Hello, this is a POST response. Post data: {post_data}'
        self.wfile.write(response_message.encode('utf-8'))


def run_server():
    host = 'localhost'
    port = 8080

    server = HTTPServer((host, port), MyHTTPRequestHandler)
    print(f'Server running on {host}:{port}')

    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print('Server stopped')
        server.server_close()

if __name__ == '__main__':
    run_server()

Config

[api]
enabled = true

[sources.demo_logs]
type = "demo_logs"
format = "json"
interval = 1

[sinks.http]
type = "http"
inputs = [ "demo_logs"]
uri = "http://localhost:8080"
encoding.codec = "native"
request.concurrency = "adaptive"