Skip to content

Instantly share code, notes, and snippets.

@juicedM3
Last active January 15, 2016 01:06
Show Gist options
  • Select an option

  • Save juicedM3/e67ef7a4ef0bf0c7407a to your computer and use it in GitHub Desktop.

Select an option

Save juicedM3/e67ef7a4ef0bf0c7407a to your computer and use it in GitHub Desktop.
cassandra-mesos shutdown script in Go. Uses ZooKeeper to help with discovery of framework ID and app ID. If you do not want to shutdown your instance of canssandra-mesos DO NOT RUN THIS SCRIPT! It doesn't verify your decision, it does not backup any data, and will shutdown as long as it can discover all the necessary fields. My first Go script.
package cassandramesosproto;
/**
* From here but slightly modified to work:
* https://github.com/mesosphere/cassandra-mesos/blob/7a9898462a3acfb7f33101b15dc9cb9e4753e2fb/cassandra-mesos-model/src/main/proto/io/mesosphere/mesos/frameworks/cassandra/model.proto
* Also, not every thing unmarshalls correctly. But it's stuff I don't need at this time.
*/
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
message CassandraFrameworkConfiguration {
required string node = 1;
required bytes zk = 2;
required FrameworkInfo frameworkInfo = 3;
}
message ZooKeeperState {
required string servers = 1;
required int64 timeout = 2;
required bytes unit = 3;
required string znode = 4;
optional string scheme = 5;
optional bytes credentials = 6;
}
message FrameworkInfo {
/**
* ID of the framework.
* Upon initial framework registration Mesos will generate the framework id and return it as part of Scheduler.registered.
* The framework will use this value (if set) when it restarts and re-registers with Mesos
*/
optional string frameworkId = 1;
/**
* Name of the framework - must be unique for each framework instance.
* This is different from cluster since if you have setup multiple Cassandra-Mesos
* frameworks for each logical data center (with different frameworkName and frameworkId) you
* still have the same Cassandra cluster name.
*/
required string frameworkName = 2;
/**
* Minimum interval in seconds to ask each executor for a health check response of the Cassandra
* server task (= Cassandra process).
*/
optional int64 healthCheckIntervalSeconds = 3;
/**
* Minimum interval in seconds between two starts of a Cassandra server task (= Cassandra process).
*/
optional int64 bootstrapGraceTimeSeconds = 4;
/**
* Globally shared mapping of ports required by Cassandra.
* Currently there are port mappings for these kinds of ports and respective default values:
* storage_port=7000, ssl_storage_port=7001, native_transport_port=9042, rpc_port=9160, jmx_port=7199
*/
repeated PortMapping portMapping = 5;
/**
* The class name of the snitch used. Defaults to GossipingPropertyFileSnitch.
*/
optional string snitch = 6;
/**
* Default configuration role.
*/
optional CassandraConfigRole defaultConfigRole = 7;
/**
* target number of Cassandra nodes.
*/
optional int32 targetNumberOfNodes = 8;
/**
* target number of seed nodes.
*/
optional int32 targetNumberOfSeeds = 9;
/**
* this is needed otherwise it breaks!
*/
optional string foo = 10;
/** List of external DCs running other rings of this cluster */
repeated ExternalDc externalDcs = 11;
/**
* The name of the cassandra cluster. Corresponds to cluster_name in cassandra.yaml
*/
optional string clusterName = 12;
}
/**
* External DC, running other ring of the same cluster
*/
message ExternalDc {
/** DC name */
optional string name = 1;
/** REST API url */
optional string url = 2;
/** Time of last seed fetching */
optional int64 seedFetchTime = 3;
/** List of fetched seeds */
repeated string seeds = 4;
}
/**
* CassandraConfigRole is currently only used once. But different nodes may require different sizings -
* e.g. when replacing old metal with new metal. Different DCs may also have different sizings.
* Individual nodes just reference a "config role".
*/
message CassandraConfigRole {
/**
* Arbitrary name of this configuration - usually "default" but could be something like "16core-128g-4x500gSSD".
*/
optional string name = 1;
/**
* Cassandra version string. Example: 2.1.4
*/
optional string cassandraVersion = 2;
/**
* The resource to provision.
* Total memory must be greater or equal to memJavaHeapMb + memAssumeOffHeapMb.
* The field ports is not used in this context.
*/
optional TaskResources resources = 3;
/**
* Cassandra memory usage can be categorized into
* 1. Java heap (MAX_HEAP_SIZE) - including new-gen (HEAP_NEWSIZE)
* Amount of Java heap in MB.
* (defaults to 50% of memMb, max 16384, if not present)
* 2. Off-Heap
* Amount of Off heap in MB.
* (defaults to memMb - memJavaHeapMb, if not present)
* 3. OS (add as much as possible for OS block cache)
* This is just the difference between memMb and memJavaHeapMb + memAssumeOffHeapMb
* Production grade memory configuration should include all three memory configuration parameters above.
* Do not forget to add as much memory for OS block cache as possible for improved performance.
*
* Off-heap structures (C* 2.1):
* - index-summary (default: 5% of the heap size)
* configured in cassandra.yaml - see index_summary_capacity_in_mb
* default to 5% of the heap size (may exceed)
* - key-cache (default: 5% of the heap size)
* configured in cassandra.yaml - see key_cache_size_in_mb
* default to 5% of the heap size
* - row-cache (default: off)
* configured in cassandra.yaml - see row_cache_size_in_mb (must be explicitly enabled in taskEnv)
* default to 0
* - counter-cache (default: min(2.5% of Heap (in MB), 50MB))
* configured in cassandra.yaml - see counter_cache_size_in_mb
* default: min(2.5% of Heap (in MB), 50MB) ; 0 means no cache
* - memtables (default on-heap)
* configured in cassandra.yaml - see file_cache_size_in_mb
* default to the smaller of 1/4 of heap or 512MB
* - file-cache (default: min(25% of Heap (in MB), 512MB))
* configured in cassandra.yaml - see file_cache_size_in_mb
* default to the smaller of 1/4 of heap or 512MB
* - overhead during flushes/compactions/cleanup
* implicitly defined by workload
*/
optional int64 memJavaHeapMb = 4;
/**
* see memJavaHeapMb for details
*/
optional int64 memAssumeOffHeapMb = 5;
/**
* additional configuration, process environment
*/
optional TaskEnv taskEnv = 6;
/**
* cassandra.yaml configuration
*/
optional TaskConfig cassandraYamlConfig = 7;
/**
* mesos role to be used to receive resource offers
*/
optional string mesosRole = 8;
/** Rack/dc info */
optional RackDc rackDc = 9;
/**
* Directory to store backups
*/
optional string backupDirectory = 10;
/**
* A pre-defined data directory specifying where cassandra should write it's data.
* NOTE:
* This field will be removed once MESOS-1554 is released and the framework will
* be able to allocate the data volume itself.
*/
optional string preDefinedDataDirectory = 999999;
}
/**
* One port mapping consisting of a port name and value.
*/
message PortMapping {
required string name = 1;
required int32 port = 2;
}
/**
* Current status of the cluster.
* This is a top-level object in the state hierarchy.
*/
message CassandraClusterState {
/**
* List of all known nodes.
*/
repeated CassandraNode nodes = 1;
/**
* List of Metadata collected during executor startup.
*/
repeated ExecutorMetadata executorMetadata = 2;
/**
* Timestamp when the last Cassandra server run task has been submitted.
*/
optional int64 lastServerLaunchTimestamp = 3;
/**
* List of IP addresses of nodes that need to be replaced.
*/
repeated string replaceNodeIps = 4;
}
/**
* Contains a history of the last health-checks that were received from all nodes.
* This is a top-level object in the state hierarchy.
*/
message CassandraClusterHealthCheckHistory {
/**
* Configuration option that determines how many entries should be recorded per node.
*/
required int32 maxEntriesPerNode = 1;
/**
* Health check history entries.
*/
repeated HealthCheckHistoryEntry entries = 2;
}
/**
* Represents and time span where the specified executor was in a particular state.
*/
message HealthCheckHistoryEntry {
/**
* Executor ID from which this entry has been received.
*/
required string executorId = 1;
/**
* First occurence of this entry.
* This field differs from timestampEnd when multiple, similar HealthCheckDetails have been received.
*/
required int64 timestampStart = 2;
/**
* Last occurence of this entry.
* This field differs from timestampStart when multiple, similar HealthCheckDetails have been received.
*/
required int64 timestampEnd = 3;
/**
* Health check details for this entry.
*/
optional HealthCheckDetails details = 4;
}
/**
* Enumeration of all known cluster job types.
*/
enum ClusterJobType {
CLEANUP = 1;
REPAIR = 2;
RESTART = 3;
BACKUP = 4;
RESTORE = 5;
TRUNCATE = 6;
}
/**
* State object that contains the current cluster-wide job and the last job status (one per job type).
* This is a top-level object in the state hierarchy.
*/
message CassandraClusterJobs {
/**
* The currently executing cluster-wide job.
*/
optional ClusterJobStatus currentClusterJob = 1;
/**
* List with the last job status per job-type.
*/
repeated ClusterJobStatus lastClusterJobs = 2;
}
/**
* Status information on a cluster-wide job.
*/
message ClusterJobStatus {
/**
* Type of the cluster-wide job.
*/
required ClusterJobType jobType = 1;
/**
* Timestamp when the cluster-wide job has been started.
*/
required int64 startedTimestamp = 2;
/**
* Timestamp when the cluster-wide job has finished.
*/
optional int64 finishedTimestamp = 3;
/**
* List containing the executor IDs of the nodes that still have to execute the job.
*/
repeated string remainingNodes = 4;
/**
* List with the per-node status of the nodes that finished the job.
*/
repeated NodeJobStatus completedNodes = 5;
/**
* Flag whether the cluster-job should be or has been aborted.
*/
optional bool aborted = 6;
/**
* Job status of the node currently executing.
*/
optional NodeJobStatus currentNode = 7;
/**
* Backup name
*/
optional string backupName = 8;
}
/**
* Per-node status of a cluster-wide job in ClusterJobStatus.
*/
message NodeJobStatus {
/**
* Executor ID of the node.
*/
required string executorId = 1;
/**
* Task ID of the node-job.
*/
required string taskId = 2;
/**
* Type of the job.
*/
required ClusterJobType jobType = 3;
/**
* Timestamp when the job has been started on the node.
*/
optional int64 startedTimestamp = 4;
/**
* Timestamp when the job has finished on the node.
*/
optional int64 finishedTimestamp = 5;
/**
* Flag whether the job is still running on the node.
*/
optional bool running = 6;
/**
* Flag whether the job failed on the node.
*/
optional bool failed = 7;
/**
* Optional failure message, if failed field is true.
*/
optional string failureMessage = 8;
/**
* List of per-keyspace status information.
*/
repeated ClusterJobKeyspaceStatus processedKeyspaces = 9;
/**
* List of remaining keyspaces to process.
*/
repeated string remainingKeyspaces = 10;
}
/**
* Per-keyspace status in NodeJobStatus.
*/
message ClusterJobKeyspaceStatus {
/**
* Name of the keyspace.
*/
required string keyspace = 1;
/**
* Status (depends on job type).
*/
required string status = 2;
/**
* Duration in milliseconds.
*/
required int64 duration = 3;
}
/**
* Describes node rack and dc.
*/
message RackDc {
/** Rack identifier */
optional string rack = 1 [default = "RAC1"];
/** DataCenter identifier */
optional string dc = 2 [default = "DC1"];
}
/**
* Describes a node.
*/
message CassandraNode {
/**
* Hostname as provided in the Mesos Offer message.
*/
required string hostname = 1;
/**
* IP resolved by the scheduler from hostname as provided in the Mesos Offer message.
*/
optional string ip = 2;
/**
* Information how to access JMX on that node.
*/
optional JmxConnect jmxConnect = 3;
/**
* List of data volumes for the node.
* Each DataVolume correspons to a Cassandra JBOD data directory.
*/
repeated DataVolume dataVolumes = 4;
/**
* Information about the executor for this node.
*/
optional CassandraNodeExecutor cassandraNodeExecutor = 5;
/**
* Tasks submitted to this node.
repeated CassandraNodeTask tasks = 6;
/**
* Flag whether the node is a seed node.
*/
optional bool seed = 8;
/**
* Timestamp when the last repair ran on this node.
*/
optional int64 lastRepairTimestamp = 9;
/**
* Timestamp when the last cleanup ran on this node.
*/
optional int64 lastCleanupTimestamp = 10;
/**
* Indicates whether a node should run or not.
*/
optional TargetRunState targetRunState = 11 [default = RUN];
/**
* The process ID of the Cassandra process.
*/
optional int32 cassandraDaemonPid = 12;
/**
* IP of the node that this node is about to replace.
*/
optional string replacementForIp = 13;
/**
* The required state of a node.
*/
enum TargetRunState {
/**
* If the server task is not active, it is started.
* Note that general bootstrap-grace-time settings and other checks (e.g. currently other nodes joining) apply.
*/
RUN = 0;
/**
* If the server-task is active, a server-task shutdown is initiated.
*/
STOP = 1;
/**
* If the server task is not active, the state is immediately set to RUN.
* If the server-task is active, a server-task shutdown is initiated.
* Note that general bootstrap-grace-time settings and other checks (e.g. currently other nodes joining) apply.
*/
RESTART = 2;
/**
* Terminate all tasks including the executor itself. There's no way back for a terminated node.
*/
TERMINATE = 3;
}
/**
* The configuration files of this node need to be updated, but the Cassandra server process does not
* need to be restarted.
*/
optional bool needsConfigUpdate = 14;
/** Rack/DC information */
optional RackDc rackDc = 15;
}
/**
* Describes a data volume for a node.
*/
message DataVolume {
/**
* Data volume (= Cassandra data directory) path.
*/
required string path = 1;
/**
* Size in MB.
*/
optional int64 sizeMb = 2;
}
/**
* Holder for resources to be provisioned for a task.
*/
message TaskResources {
/**
* Number of CPU cores to provision.
*/
optional double cpuCores = 1;
/**
* Amount of disk space to provision.
*/
optional int64 diskMb = 2;
/**
* Amount of memory to provision.
*/
optional int64 memMb = 3;
/**
* Port ranges to provision.
*/
repeated int64 ports = 4;
}
/**
* Describes the executor process on a node.
*/
message CassandraNodeExecutor {
/**
* ID of the executor.
*/
required string executorId = 1;
/**
* Source (= framework name).
*/
required string source = 2;
/**
* Resources to be provisioned for the executor process.
*/
required TaskResources resources = 3;
/**
* Executor process command and arguments.
*/
repeated string command = 7;
/**
* Executor process environment.
*/
optional TaskEnv taskEnv = 8;
/**
* Required resources that must be downloaded.
*/
repeated FileDownload download = 5;
}
/**
* File to be downloaded.
*/
message FileDownload {
/**
* URL to download the file from.
*/
required string downloadUrl = 1;
/**
* Flag whether the file is executable.
*/
optional bool executable = 2;
/**
* Flag whether to extract the file (for example .tar.gz bundles)
*/
optional bool extract = 3;
}
/**
* Describes a task for a node.
*/
message CassandraNodeTask {
/**
* Type of task.
*/
required NodeTaskType type = 1;
/**
* ID of the task.
*/
required string taskId = 2;
/**
* Resources to be provisioned for this task.
*/
required TaskResources resources = 3;
/**
* Details for the task - depending on taskType.
*/
optional TaskDetails taskDetails = 4;
enum NodeTaskType {
/**
* Inquire some meta information from the executor.
*/
METADATA = 1;
/**
* Start the Cassandra process.
*/
SERVER = 2;
/**
* Start the node's part of a cluster wide job.
*/
CLUSTER_JOB = 3;
/**
* Update the Cassandra configuration files without (re)starting the Cassandra process.
*/
CONFIG = 4;
}
/**
* name of the task.
*/
optional string taskName = 5;
}
/**
* Details for a task to launch on a node or a task submitted via Mesos framework messages.
* Tasks are guaranteed to execute or fail with a status sent to the scheduler.
* Framework messages are much like UDP packages - no delivery guarantee - messages might be received
* out-of-order.
*/
message TaskDetails {
/**
* Type of the task.
*/
required TaskDetailsType type = 1;
/**
* ???
*/
optional ExecutorMetadataTask executorMetadataTask = 2;
/**
* Start the Cassandra process.
*/
optional CassandraServerRunTask cassandraServerRunTask = 3;
/**
* Start a node's part of a cluster wide job.
*/
optional NodeJobTask nodeJobTask = 4;
/**
* Update the Cassandra configuration files.
*/
optional UpdateConfigTask updateConfigTask = 5;
enum TaskDetailsType {
/**
* ???
* Via launchTask.
*/
EXECUTOR_METADATA = 1;
/**
* Start the Cassandra process.
* Via launchTask.
*/
CASSANDRA_SERVER_RUN = 2;
/**
* Start a node's part of a cluster-wide job.
* Via launchTask.
*/
NODE_JOB = 3;
/**
* Ask executor to return the status of its current node-job to its scheduler.
* Via framework message from scheduler to executor.
*/
NODE_JOB_STATUS = 4;
/**
* Update the Cassandra configuration files.
*/
UPDATE_CONFIG = 5;
}
}
/**
* ???
*/
message ExecutorMetadataTask {
/**
* ???
*/
required string executorId = 1;
/**
* ???
*/
required string ip = 2;
}
/**
* Cassandra process start task.
*/
message CassandraServerRunTask {
/**
* Cassandra version.
*/
required string version = 1;
/**
* Cassandra server process command and arguments.
*/
repeated string command = 2;
/**
* Cassandra server configuration.
*/
required CassandraServerConfig cassandraServerConfig = 3;
/**
* Information how to connect to Cassandra's JMX server.
*/
required JmxConnect jmx = 4;
/**
* The interval in seconds that the executor should perform a health check of the server process.
*/
optional int64 healthCheckIntervalSeconds = 6;
}
/**
* Executor task to update the Cassandra configuration files.
*/
message UpdateConfigTask {
required CassandraServerConfig cassandraServerConfig = 1;
}
/**
* Executor task information to update the Cassandra configuration files.
*/
message CassandraServerConfig {
/**
* Required files.
*/
repeated TaskFile taskFiles = 1;
/**
* Cassandra server process environment.
*/
optional TaskEnv taskEnv = 2;
/**
* Cassandra.yaml configuration details.
*/
required TaskConfig cassandraYamlConfig = 3;
/** Rack/DC information */
optional RackDc rackDc = 4;
}
/**
* Contains information what kind of cluster job a node should start.
*/
message NodeJobTask {
/**
* Cluster job type.
*/
required ClusterJobType jobType = 1;
/**
* Backup directory
*/
optional string backupDir = 3;
}
/**
* Information how to connect to Cassandra's JMX server.
*/
message JmxConnect {
/**
* The TCP port the JMX server uses. Usually this port equals to the default port in
* CassandraFrameworkConfiguration.portMapping.
*/
required int32 jmxPort = 1;
/**
* IP address where the JMX server runs. This is usually the IP address of the node.
*/
required string ip = 2;
// TODO add JMX auth params
}
/**
* Describes a File that needs to be created/overwritten at the specified outputPath
* with provided data
*/
message TaskFile {
/**
* Target path name.
*/
required string outputPath = 1;
/**
* File contents.
*/
required bytes data = 2;
}
/**
* Configuration for a configuration file - usually cassandra.yaml.
*/
message TaskConfig {
/**
* A single cassandra.yaml config item.
*/
message Entry {
/**
* Config item name.
*/
required string name = 1;
/**
* Cassandra.yaml string value.
*/
optional string stringValue = 2;
/**
* Cassandra.yaml integer value.
*/
optional int64 longValue = 3;
/**
* Cassandra.yaml list of string values.
*/
repeated string stringValues = 4;
}
/**
* List of config items.
*/
repeated Entry variables = 1;
}
/**
* Describes a collection of environment variables.
*/
message TaskEnv {
/**
* Process environment entry.
*/
message Entry {
/**
* Environment variable name.
*/
required string name = 1;
/**
* Environment variable value.
*/
required string value = 2;
}
/**
* List of environment variables.
*/
repeated Entry variables = 1;
}
/**
* Status details sent back from the executor to the scheduler either as a status-update for a
* launched task or as a framework message in reply to a status inquiry.
*/
message SlaveStatusDetails {
/**
* Type of status details.
*/
required StatusDetailsType statusDetailsType = 1;
/**
* ???
*/
optional ExecutorMetadata executorMetadata = 2;
/**
* Information about the launched Cassandra process.
*/
optional CassandraServerRunMetadata cassandraServerRunMetadata = 3;
/**
* Error information.
*/
optional SlaveErrorDetails slaveErrorDetails = 4;
/**
* Health check details (only via framework messages).
*/
optional HealthCheckDetails healthCheckDetails = 5;
/**
* Node job status (only via framework messages).
*/
optional NodeJobStatus nodeJobStatus = 6;
enum StatusDetailsType {
NULL_DETAILS = 1;
EXECUTOR_METADATA = 2;
CASSANDRA_SERVER_RUN = 3;
ERROR_DETAILS = 4;
HEALTH_CHECK_DETAILS = 5;
NODE_JOB_STATUS = 6;
}
}
/**
* ???
*/
message ExecutorMetadata {
/**
* ???
*/
required string executorId = 1;
/**
* ???
*/
optional string ip = 2;
/**
* Ephemeral executor work directory.
*/
required string workdir = 3;
}
/**
* Information about the launched Cassandra process.
*/
message CassandraServerRunMetadata {
/**
* Process ID of the Cassandra process.
*/
required int32 pid = 1;
}
/**
* Error information sent as status update from executor to scheduler.
*/
message SlaveErrorDetails {
/**
* Error message.
*/
optional string msg = 1;
/**
* Error details (if available).
*/
optional string details = 2;
/**
* Error type.
*/
optional ErrorType errorType = 3;
/**
* Exit code of the process.
*/
optional int32 processExitCode = 4;
enum ErrorType {
/**
* cassandra server process is not runnning but is expected to be
*/
PROCESS_NOT_RUNNING = 1;
/**
* cassandra server process exited
*/
PROCESS_EXITED = 2;
/**
* protobuf (de)serialization failed
*/
PROTOCOL_VIOLATION = 3;
/**
* another node job is still running
*/
ANOTHER_JOB_RUNNING = 4;
/**
* failed to start task
*/
TASK_START_FAILURE = 5;
}
}
/**
* Health check information.
*/
message HealthCheckDetails {
/**
* Flag whether the Cassandra process can be considered healthy.
*/
optional bool healthy = 1;
/**
* May contain reason why the Cassandra process is not considered healthy
*/
optional string msg = 2;
/**
* Optional details.
*/
optional NodeInfo info = 3;
}
/**
* Cassandra process information as inquired by JMX.
*/
message NodeInfo {
/**
* Name of the cluster.
*/
optional string clusterName = 1;
/**
* Operation mode as reported by Cassandra.
*/
optional string operationMode = 2;
/**
* Flag whether the node has joined the cluster.
*/
optional bool joined = 3;
/**
* Flag whether the Thrift (RPC) server is running.
*/
optional bool rpcServerRunning = 4;
/**
* Flag whether the native protocol server is running.
*/
optional bool nativeTransportRunning = 5;
/**
* Flag whether gossip is initialized.
*/
optional bool gossipInitialized = 6;
/**
* Flag whether gossip is running.
*/
optional bool gossipRunning = 7;
/**
* Uptime of the JVM in milliseconds.
*/
optional int64 uptimeMillis = 8;
/**
* ID of the Cassandra host.
*/
optional string hostId = 9;
/**
* Endpoint of the Cassandra host.
*/
optional string endpoint = 10;
/**
* Number of tokens managed by this node.
*/
optional int32 tokenCount = 11;
/**
* Name of the data center of this node.
*/
optional string dataCenter = 12;
/**
* Name of the rack of this node.
*/
optional string rack = 13;
/**
* Cassandra version string (as returned by Cassandra).
*/
optional string version = 14;
}
all: *.proto
protoc --proto_path=${GOPATH}/src:${GOPATH}/src/github.com/gogo/protobuf/protobuf:. --gogo_out=. *.proto
package main
/**
*
* TODO: Make sure node(s) exists.
* TODO: Command line overrides for Mesos Master & Marathon Leader.
* TODO: Read in /etc/mesos/zk for ZK Server list?
* TODO: Verify you want to shutdown the cassandra-mesos framework?
* TODO: Make sure servers are reachable?
* TODO: What to do if DestroyMarathonApp fails?
* When running multiple times, if the app is gone, it doesn't care.
* I don't think this is bad behavior.
* TODO: What to do if TeardownMesosFramework fails?
* Currently fails silently.
* TODO: Not all the cassandra-mesos ZK protobuf data is unmarshalled correctly. Would like it working 100% but not necessary for what I'm doing.
* DONE: Make sure node inputs belong with '/'.
* DONE: Mesos and Marathon authentication
*
*/
import (
"encoding/json"
"fmt"
"flag"
"log"
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"
"github.com/samuel/go-zookeeper/zk"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
"cassandramesosproto"
)
func GetMarathonServer(c *zk.Conn, zk_node string) string {
children, _, err := c.Children(zk_node)
if err != nil {
log.Fatal(fmt.Sprintf("%s %v", zk_node, err))
}
// fmt.Printf("children: %+v. type: %+v", children, reflect.TypeOf(children))
if len(children) == 0 {
e := fmt.Sprintf("Information about Marathon is not where it's expected: %s contains no children.", zk_node)
log.Fatal(e)
}
sort.Strings(children)
// just care about the first child aka the leader.
zk_node += "/" + children[0]
data, _, error := c.Get(zk_node)
if error != nil {
panic(error)
}
// fmt.Printf("\tMarathon Server %+v\n", data)
return string(data)
}
func GetMesosMaster(c *zk.Conn, zk_node string) string {
var mesosmaster string
masterInfo := new(mesosproto.MasterInfo)
children, _, err := c.Children(zk_node)
if err != nil {
log.Fatal(fmt.Sprintf("%s %v", zk_node, err))
}
if len(children) == 0 {
e := fmt.Sprintf("Information about Mesos is not where it's expected: %s contains no children.", zk_node)
log.Fatal(e)
}
sort.Strings(children)
for _, name := range children {
node := zk_node + "/" + name
data, _, error := c.Get(node)
fmt.Printf("node: %s\n", node)
if error != nil {
panic(error)
}
err = json.Unmarshal(data, masterInfo) // 0.24
if err != nil {
err = proto.Unmarshal(data, masterInfo) // <0.24
}
if err == nil {
mesosmaster = fmt.Sprintf("%s:%d", masterInfo.GetHostname(), masterInfo.GetPort())
fmt.Printf("Mesos Master: %v+\n", masterInfo)
break
}
}
if len(mesosmaster) == 0 {
e := fmt.Sprintf("Could not find Mesos Master within children (%s) at node %s.", children, zk_node)
log.Fatal(e)
}
return string(mesosmaster)
}
/*
* cassandra-mesos has to catch up the Mesos 0.24 and start storing
* ZK data as JSON.
*/
func GetFrameworkId(c *zk.Conn, zk_node string) [2]string {
var frameworkId string
var frameworkName string
cassandraFrameworkConfiguration := new(cassandramesosproto.CassandraFrameworkConfiguration)
data, _, error := c.Get(zk_node)
if error != nil {
panic(error)
}
err := proto.Unmarshal(data, cassandraFrameworkConfiguration)
if err != nil {
fmt.Printf("Cassandra Framework Configuration %+v\n\n", cassandraFrameworkConfiguration)
// fmt.Printf("Framework Info %+v", cassandraFrameworkConfiguration.GetFrameworkInfo())
frameworkId = fmt.Sprintf("%s", cassandraFrameworkConfiguration.GetFrameworkInfo().GetFrameworkId())
frameworkName = fmt.Sprintf("%s", cassandraFrameworkConfiguration.GetFrameworkInfo().GetFrameworkName())
// ASSUMPTION: Any '/' in the Marathon App ID gets translated to '.' for Mesos/ZK.
// Not sure if there are other characters too. So here were are converting it back.
// After some testing, this doesn't always line-up. So there might be a better spot in ZK to grab the data.
frameworkName = strings.Replace(frameworkName, ".", "/", -1)
fmt.Printf("Framework ID: %s\n", frameworkId)
fmt.Printf("Framework Name: %s\n", frameworkName)
} else {
panic(err)
}
return [2]string{string(frameworkId), string(frameworkName)}
}
func DestroyMarathonApp(server string, app_id string) {
// curl -X DELETE http://${MARATHON_MASTER}:8080/v2/apps/${CASSANDRA_MARATHON_ID}
fmt.Printf("Destroying Marathon App %s.\n", app_id)
uri := "http://"
if len(marathonAuth) > 0 {
uri += marathonAuth + "@"
}
uri += server + "/v2/apps/" + app_id
req, err := http.NewRequest("DELETE", uri, nil)
if err != nil {
panic(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// fmt.Printf("DestroyMarathonApp resp: %v+\n", resp)
}
/*
* Endpoint /master/teardown is > 0.23
* Endpoint /master/shutdown is <= 0.23
*/
func TeardownMesosFramework(server string, id string) {
// curl -d "frameworkId=${FRAMEWORK_ID}" http://${MESOS_MASTER}:5050/master/teardown
uri := "http://"
fmt.Printf("Shutting down Mesos Framework %s.\n", id)
data := url.Values{}
data.Add("frameworkId", id)
if len(mesosAuth) > 0 {
uri += mesosAuth + "@"
}
if mesosVersion >= 0.24 {
uri += server + "/master/teardown" // 0.24
} else {
uri += server + "/master/shutdown" // 0.23
}
// fmt.Printf("Server URI %s.\n", uri)
resp, err := http.PostForm(uri, data)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// fmt.Printf("TeardownMesosFramework resp: %v+\n", resp)
}
/*
* No recursive delete like the zkCli (rmr), so we're assuming
* the cassandra-mesos ZK structure remains 1 level deep.
*/
func RemoveZookeeperNode( c *zk.Conn, node string ) {
children, _, err := c.Children(node)
if err != nil {
panic(err)
}
// delete children, assuming no grandchildren
for _,element := range children {
err := c.Delete( node + "/" + element, -1 )
if err != nil {
panic(err)
}
}
// delete paraent
err = c.Delete( node, -1 )
if err != nil {
panic(err)
}
}
func GetUserInput(opts []string) string {
fmt.Println("Select cassandra-mesos node to shutdown:")
fmt.Printf("\t[%d] %s\n", 0, "Exit")
for i,name := range opts {
fmt.Printf("\t[%d] %s\n", i+1, name)
}
var selected int
for {
fmt.Print("--> ")
_, err := fmt.Scanf("%d", &selected)
// go to make sure selection isn't > len(opts).
if err == nil && selected <= len(opts) {
break
}
}
if selected == 0 {
os.Exit(0)
}
selectedNode := "/" + opts[int(selected)-1]
fmt.Printf("Selected cassandra-mesos node [%d] to shutdown: %s\n", int(selected)-1, selectedNode)
return selectedNode
}
func checkZKNodePrefix(app string, node string) {
if ! strings.HasPrefix(node, "/") {
e := fmt.Sprintf("ZK node for %s (%s) must begin with a '/'", app, node)
log.Fatal(e)
}
}
var zkHost string
var cassandraZkNode string
var mesosZkNode string
var marathonZkNode string
var frameworkId string
var mesosVersion float64
var mesosAuth string
var marathonAuth string
func init() {
flag.StringVar(&zkHost, "zk", "127.0.0.1", "ZooKeeper Host")
flag.StringVar(&cassandraZkNode, "node", "", "cassandra-mesos ZK node. Must start with '/'. If not provided assumes any node that matches 'cassandra'.")
flag.StringVar(&mesosZkNode, "mesosNode", "/mesos", "Override for Mesos ZK node. Must start with '/'.")
flag.StringVar(&marathonZkNode, "marathonNode", "/marathon", "Override for Marathon ZK node. Must start with '/'.")
flag.StringVar(&frameworkId, "frameworkId", "", "Override for Mesos framework id.")
flag.Float64Var(&mesosVersion, "mesosVersion", 0.24, "Override for Mesos major version. Protobuf (0.23) vs JSON (0.24).")
flag.StringVar(&mesosAuth, "mesosAuth", "", "Mesos authentication. Format 'user:password'.")
flag.StringVar(&marathonAuth, "marathonAuth", "", "Marathon HTTP authentication. Format 'user:password'.")
flag.Parse()
fmt.Printf("Args %+v.\n", flag.Args())
}
func main() {
// var servers []string
// fmt.Printf("Connecting to ZooKeeper host %s.\n", zkHost)
// servers = getZookeeperServers("/etc/mesos/zk")
c, _, err := zk.Connect([]string{zkHost}, time.Second) //*10)
if err != nil {
e := fmt.Sprintf("Could not connect to ZK host %s: %s", zkHost, err)
log.Fatal(e)
// panic(err)
}
checkZKNodePrefix( "mesos", mesosZkNode )
checkZKNodePrefix( "marathon", marathonZkNode )
mesosmaster := GetMesosMaster(c, mesosZkNode)
fmt.Printf("\nMesos Master %s\n", mesosmaster)
marathon_server := GetMarathonServer(c, marathonZkNode + "/leader")
fmt.Printf("Marathon Server %s\n\n", marathon_server)
if len(cassandraZkNode) == 0 {
var opts []string
children, _, err := c.Children("/")
if err != nil {
panic(err)
}
fmt.Printf("Children %+v\n", children)
for _,element := range children {
if strings.Contains( element, "cassandra" ) {
fmt.Printf("\tChild %+s\n", element)
opts = append(opts, string(element))
}
}
// No children have the work 'cassandra', so show the user all children.
if len(opts) == 0 {
cassandraZkNode = GetUserInput( children )
// 1 child matches 'cassandra', assume this is the one to delete.
} else if len(opts) == 1 {
cassandraZkNode = "/" + opts[0]
// More than one child matches 'cassandra', let user choose.
} else {
cassandraZkNode = GetUserInput( opts )
}
} else {
checkZKNodePrefix( "cassandra", cassandraZkNode )
}
if len(frameworkId) == 0 {
fw := GetFrameworkId(c, cassandraZkNode + "/CassandraFrameworkConfiguration")
frameworkId = fw[0]
fmt.Printf("\t\tFW: %+s\n", fw[0])
DestroyMarathonApp( marathon_server, fw[1] )
}
TeardownMesosFramework( mesosmaster, frameworkId )
RemoveZookeeperNode( c, cassandraZkNode )
// fmt.Printf("Stat: %+v\n", stat)
// fmt.Printf("ch: %+v\n", ch)
// fmt.Printf("State: %+v\n", state)
defer c.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment