Skip to content

Commit

Permalink
Fix Blackhole failpoint in the proxy does not block all updates etcd-…
Browse files Browse the repository at this point in the history
…io#17737

[Problem statement]

The blackhole() leverages proxy to drop all the incoming and outgoing
traffic passing through it, but the proxy doesn't properly block out all
the communication channels for a given member, due to the fact that the
member initiates connections from itself to others.

[Overview of the proposed solutions]

The proposed implementation here performs traffic blocking at L7
(application layer). There is another idea from fuweid performs blocking
at L4, as mentioned in reference [1].

[Root cause]
(Diagrams credit: fuweid)

Let's assuming the following:
- Current member ID (the member we would like to perform blackhole on): A
- Other member ID: B, C

In the e2e test set up, let's breakdown how A is able to communicate
with its peers.

For the case of incoming connections from B and C to A, the connections
from B and C will be established through A-proxy.

B -----> A-Proxy -----> A
           ^
           |
           C

For the case of establishing outgoing connections from A to B and C,
the connections from A will be established through B-Proxy and C-Proxy,
before reaching B and C, respectively.

A -----> B-Proxy ----> B
|
+--------> C-Proxy ---> C

As you can see, currently the `BlackholeTx` and `BlackholeRx` only blocks
traffic between `X-Proxy <---> X`. Thus, only the externally-established
incoming traffic will be block (B and C to A).

[Implementation]

For each member, it has 2 types of communication channels, namely stream
and pipeline. Connections made by pipeline is not persisted, but for
stream the connection is continuously used by the member for long-poling.

In order to block the outgoing connections, we can hijack and drop the
data from `RoundTrip` and `ServeHTTP`.

When establishing a new connection, `RoundTrip` will be called. By using
a failpoint, we can drop the data carried by `Body` in RequestBody
on-demand.

When accepting a new connection, `ServeHTTP` will be called. By using a
failpoint, we can drop the data carried by `Body` in RequestBody, and
drop the data written to `ResponseWriter` on-demand.

If a connection is already opened (like a stream), because we hijacked
the `Reader` and `Writer` interface, we can still drop traffic as
desired.

[Discussion]

The downside of this approach is that we are introducing a custom
implementation of `RoundTrip`, as we are hijacking the connection.

[Testing]

make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionLeader$ go.etcd.io/etcd/tests/v3/e2e -v -count=1

make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionFollower$ go.etcd.io/etcd/tests/v3/e2e -v -count=1

References:
[1] etcd-io#17737 (comment)
[2] https://github.com/etcd-io/etcd/pull/17790/files#diff-f01210a3082e25ff00682648f32122941a0c275b3926a8da37447589fe2ede1aR109

Signed-off-by: Chun-Hung Tseng <[email protected]>
  • Loading branch information
henrybear327 committed Apr 17, 2024
1 parent c6a2d1d commit 2f4e864
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 3 deletions.
57 changes: 57 additions & 0 deletions server/etcdserver/api/rafthttp/hijackedReadCloser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafthttp

import (
"io"
"net/http"
)

type hijackedReadCloser struct {
originalReadCloser io.ReadCloser
}

func (h *hijackedReadCloser) Read(p []byte) (int, error) {
// gofail: var DemoDropRequestBodyFailPoint struct{}
// return discardReadData(h.originalReadCloser, p)

if h.originalReadCloser == nil {
return 0, nil
}
return h.originalReadCloser.Read(p)
}

func (h *hijackedReadCloser) Close() error {
if h.originalReadCloser == nil {
return nil
}
return h.originalReadCloser.Close()
}

/* helper functions */
func hijackRequestBody(r *http.Request) {
r.Body = &hijackedReadCloser{
originalReadCloser: r.Body,
}
}

func discardReadData(rc io.ReadCloser, p []byte) (int, error) {
// return rc.Read(make([]byte, len(p)))

_, err := rc.Read(make([]byte, len(p)))
return 0, err // discard data but return original error

// return 0, nil
}
39 changes: 39 additions & 0 deletions server/etcdserver/api/rafthttp/hijackedRoundTripper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafthttp

import "net/http"

/* for stream */
type hijackedStreamRoundTripper struct {
// in order to preserve the already configured Transport for pipeline and stream
http.Transport
}

func (t *hijackedStreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
hijackRequestBody(r)
return t.Transport.RoundTrip(r)
}

/* for pipeline */

type hijackedPipelineRoundTripper struct {
http.Transport
}

func (t *hijackedPipelineRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
hijackRequestBody(r)
return t.Transport.RoundTrip(r)
}
56 changes: 56 additions & 0 deletions server/etcdserver/api/rafthttp/hijackedWriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafthttp

import (
"net/http"
)

type hijackedResponseWriter struct {
originalResponseWriter http.ResponseWriter
}

func (h *hijackedResponseWriter) Header() http.Header {
return h.originalResponseWriter.Header()
}

func (h *hijackedResponseWriter) Write(p []byte) (int, error) {
// gofail: var DemoStreamHandlerWriterFailPoint struct{}
// return discardWriteData(p)

if h.originalResponseWriter == nil {
return 0, nil
}
return h.originalResponseWriter.Write(p)
}

func (h *hijackedResponseWriter) WriteHeader(statusCode int) {
h.originalResponseWriter.WriteHeader(statusCode)
}

func (h *hijackedResponseWriter) Flush() {
h.originalResponseWriter.(http.Flusher).Flush()
}

/* helper functions */
func hijackResponseWriter(w http.ResponseWriter) *hijackedResponseWriter {
return &hijackedResponseWriter{
originalResponseWriter: w,
}
}

func discardWriteData(p []byte) (int, error) {
return 0, nil
}
9 changes: 9 additions & 0 deletions server/etcdserver/api/rafthttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler {
}

func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
hijackRequestBody(r)
w = hijackResponseWriter(w)

if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
Expand Down Expand Up @@ -197,6 +200,9 @@ const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
hijackRequestBody(r)
w = hijackResponseWriter(w)

start := time.Now()

if r.Method != "POST" {
Expand Down Expand Up @@ -346,6 +352,9 @@ func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) htt
}

func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
hijackRequestBody(r)
w = hijackResponseWriter(w)

if r.Method != "GET" {
w.Header().Set("Allow", "GET")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
Expand Down
12 changes: 11 additions & 1 deletion server/etcdserver/api/rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Transporter interface {
Stop()
}

// Transport implements Transporter interface. It provides the functionality
// Transport implements etcd's Transporter interface. It provides the functionality
// to send raft messages to peers, and receive raft messages from peers.
// User should call Handler method to get a handler to serve requests
// received from peerURLs.
Expand Down Expand Up @@ -140,6 +140,16 @@ func (t *Transport) Start() error {
if err != nil {
return err
}

// make sure that the transport is copied over so we are using the right parameters for the connections
t.streamRt = &hijackedStreamRoundTripper{
Transport: *t.streamRt.(*http.Transport).Clone(),
}

t.pipelineRt = &hijackedPipelineRoundTripper{
Transport: *t.pipelineRt.(*http.Transport).Clone(),
}

t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.pipelineProber = probing.NewProber(t.pipelineRt)
Expand Down
13 changes: 13 additions & 0 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithIsPeerTLS(true),
e2e.WithPeerProxy(true),
e2e.WithGoFailEnabled(true),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
Expand All @@ -64,6 +65,12 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
if err := partitionedMember.Failpoints().SetupHTTP(context.Background(), "DemoDropRequestBodyFailPoint", `sleep("0.05s")`); err != nil {
t.Fatal(err)
}
if err := partitionedMember.Failpoints().SetupHTTP(context.Background(), "DemoStreamHandlerWriterFailPoint", `sleep("0.05s)`); err != nil {
t.Fatal(err)
}

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -83,6 +90,12 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
if err := partitionedMember.Failpoints().DeactivateHTTP(context.Background(), "DemoDropRequestBodyFailPoint"); err != nil {
t.Fatal(err)
}
if err := partitionedMember.Failpoints().DeactivateHTTP(context.Background(), "DemoStreamHandlerWriterFailPoint"); err != nil {
t.Fatal(err)
}

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
4 changes: 2 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g

.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail enable server/etcdserver/ server/etcdserver/api/rafthttp server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./tests && go get go.etcd.io/gofail@${GOFAIL_VERSION}

.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail disable server/etcdserver/ server/etcdserver/api/rafthttp server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy
Expand Down

0 comments on commit 2f4e864

Please sign in to comment.