-
Notifications
You must be signed in to change notification settings - Fork 7
Increase number of connectivity options #31
Conversation
Resolves #30 |
@candysmurf what is the reason of using your cassandra image instead of official one? |
cassandra/cassandra.go
Outdated
@@ -75,6 +83,41 @@ func (cas *CassandraPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) | |||
serverAddrRule.Description = "Cassandra server" | |||
config.Add(serverAddrRule) | |||
|
|||
portRule, err := cpolicy.NewIntegerRule(portRuleKey, false, 9042) | |||
handleErr(err) | |||
serverAddrRule.Description = "Cassandra server port, default: 9042" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
portRule.Description
cassandra/cassandra.go
Outdated
|
||
timeoutRule, err := cpolicy.NewIntegerRule(timeoutRuleKey, false, 2) | ||
handleErr(err) | ||
serverAddrRule.Description = "Connection timeout in seconds, default: 2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeoutRule.Description
cassandra/cassandra.go
Outdated
|
||
connectionTimeoutRule, err := cpolicy.NewIntegerRule(connectionTimeoutRuleKey, false, 2) | ||
handleErr(err) | ||
serverAddrRule.Description = "Initial connection timeout in seconds, default: 2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connectionTimeoutRule
cassandra/cassandra.go
Outdated
|
||
initialHostLookupRule, err := cpolicy.NewBoolRule(initialHostLookupRuleKey, false, true) | ||
handleErr(err) | ||
serverAddrRule.Description = "Lookup for cluster hosts information, default: true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialHostLookupRule
cassandra/cassandra.go
Outdated
|
||
ignorePeerAddrRule, err := cpolicy.NewBoolRule(ignorePeerAddrRuleKey, false, false) | ||
handleErr(err) | ||
serverAddrRule.Description = "Turn off cluster hosts tracking, default: false" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignorePeerAddrRule
cassandra/cassandra.go
Outdated
serverAddrRule.Description = "Turn off cluster hosts tracking, default: false" | ||
config.Add(ignorePeerAddrRule) | ||
|
||
dontCreateKeyspaceRule, err := cpolicy.NewBoolRule(createKeyspaceRuleKey, false, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createKeyspaceRule
cassandra/cassandra.go
Outdated
@@ -223,7 +288,7 @@ func getSslOptions(cfg map[string]ctypes.ConfigValue) *sslOptions { | |||
|
|||
func handleErr(e error) { | |||
if e != nil { | |||
log.Fatal(e.Error()) | |||
log.Fatalf("ABC %s", e.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "ABC" represent here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was for debug purposes, I've forgot to remove it :/
log.Fatal(err.Error()) | ||
} | ||
|
||
if err := session.Query(createTagTableCQL).Exec(); err != nil { | ||
if err := session.Query(fmt.Sprintf(createTagTableCQL, co.keyspace)).Exec(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we allow a user to specify the table name similar like the keyspace but default to snap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do this, but I propose to use metrics
instead of snap
;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about tag
table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squall0gd, tag table is a by-product. Let us leave it. Also metric as the table name seems fine if we don't want to change too much now.
cassandra/client.go
Outdated
cluster := createCluster(co.server) | ||
func createKeyspace(co clientOptions) { | ||
cluster := createCluster(co) | ||
session, err := cluster.CreateSession() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using getInstance(), singleton session is desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know, but you need to create keyspace separately. Your configured session isn't able to switch to new keyspace. This case will remove keyspace name from rest of queries. Another approach is to format each query with keyspace name. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squall0gd, yes, keep the keyspace in the query.
cassandra/client.go
Outdated
if err != nil { | ||
log.Fatal(err.Error()) | ||
} | ||
defer session.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't close out the session here. Leaving it open
@squall0gd, using my image was for the medium test to check if the Cassandra is up. It's not required. |
cassandra/cassandra.go
Outdated
connectionTimeoutRuleKey = "connectionTimeout" | ||
initialHostLookupRuleKey = "initialHostLookup" | ||
ignorePeerAddrRuleKey = "ignoreRuleKey" | ||
createKeyspaceRuleKey = "createKeyspace" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that we don't need this property. The keyspace is default to snap
. Keyspace name suffices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My case is that cassandra administrator give me access to cassandra instance with prepared keyspace for me. I cannot create any other keyspace, so even attempt is killing publisher flow.
cassandra/cassandra.go
Outdated
|
||
passwordRule, err := cpolicy.NewStringRule(passwordRuleKey, false, "") | ||
dontCreateKeyspaceRule, err := cpolicy.NewBoolRule(createKeyspaceRuleKey, false, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove negation from option name (dont-
), as it's value is used directly further in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I've forgot about this one :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squall0gd, please address my feedback. Your dynamic table name implementation simply does not work now.
session, err := cluster.CreateSession() | ||
if err != nil { | ||
log.Fatal(err.Error()) | ||
} | ||
|
||
if err := session.Query(createKeyspaceCQL).Exec(); err != nil { | ||
log.Fatal(err.Error()) | ||
if co.createKeyspace { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this createKeyspace flag. It's redundant. As the CQL itself already does this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I cannot - it's required :/ if you try to create keyspace and you don't have permissions to your cassandra cluster then your snap task will fail b/c cassandra publisher will panic b/c gocql will exit with panic ;). This flag is simply to handle situation when you know that you don't have permissions.
If you've got permissions to create keyspace, then you're right, but you cannot simply predict if remote cluster give you required permissions or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squall0gd, user should have privilege set inside Cassandra. we shouldn't use this flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@candysmurf you cannot expect that your client will have this permissions. And as far as we cannot do that, we need this flag to run tasks for enterprise ready cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you've got some proof of importance of this flag.
File task_w_creating_keyspace.yml
- emulates situation, when this flag isn't needed
File task_wo_creating_keyspace.yml
- prevents from creating keyspace.
As you can see creating keyspace is killing first task, while second task is up and running.
cassandra/client.go
Outdated
createTableCQL = "CREATE TABLE IF NOT EXISTS snap.metrics (ns text, ver int, host text, time timestamp, valType text, doubleVal double, strVal text, boolVal boolean, tags map<text,text>, PRIMARY KEY ((ns, ver, host), time),) WITH CLUSTERING ORDER BY (time DESC);" | ||
createTagTableCQL = "CREATE TABLE IF NOT EXISTS snap.tags (key text, val text, time timestamp, ns text, ver int, host text, valType text, doubleVal double, strVal text, boolVal boolean, tags map<text,text>, PRIMARY KEY ((key, val), time),) WITH CLUSTERING ORDER BY (time DESC);" | ||
createKeyspaceCQL = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};" | ||
createTableCQL = "CREATE TABLE IF NOT EXISTS %s.metrics (ns text, ver int, host text, time timestamp, valType text, doubleVal double, strVal text, boolVal boolean, tags map<text,text>, PRIMARY KEY ((ns, ver, host), time),) WITH CLUSTERING ORDER BY (time DESC);" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the table name is always metrics
although the intent is to have user-specified table name but default to metrics
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch! Thanks :)
LGTM, @squall0gd, we can merge this PR after you squash all the commits. thanks. |
c62ff50
to
8c29070
Compare
Fixes #
Summary of changes:
port
- described heretimeout
- described hereconnectionTimeout
- described hereinitialHostLookup
- described hereignorePeerAddr
- described herekeyspace
- described herecreateKeyspace
- decide that keyspace should be created or not.snap
How to verify it:
Testing done:
TBD:
Unit testsIntegration testsA picture of a snapping turtle (not required but encouraged):
Before change:
If your Cassandra cluster is placed in private network with public endpoint and your administrator give you so little permissions(so you cannot even create your own namespace), you cannot use this plugin.
After change:
This PR brings additional options to configure connection to Cassandra cluster. Following options has been added: