Building Production Frameworks

Adam@ Alex@ Connor@

I got my tasks running.

What else is there to learn?

  • Roles, Reservations, Persistence
  • Scheduler HA, Reconciliation, State Abstraction
  • Status Updates, Framework Messages
  • Authentication, ACLs
  • DCOS Integration, 1.0 HTTP API

Agenda

  • Intro to Writing Frameworks
  • Framework Patterns
  • API Overview
  • Resource Allocation
  • Handling Failure
  • Security
  • Other Topics

Intro: The Core Dynamic

  • Receive resource offers
  • Make decisions (launch tasks, or decline)
  • Receive status updates
  • Update local view of the global state

Intro: Tools at Your Disposal

  • The State abstraction
  • Status updates
  • Custom executors
  • Framework messages

Framework Pattern: Resource Math

  • Tasks are typically launched on partial offers
  • Does this offer satisfy a task I want to run?
  • What's left of the offer after I launch this task?

Framework Pattern: Tracking Task Metadata

  • Scheduler state is usually associated with TaskIDs
  • Update state upon receiving status updates
  • Metadata can be persisted via State

Framework Pattern: Intermediate Results

  • Best accomplished with a custom executor
  • Status update
  • Framework message

Framework Pattern: Gang Scheduling

  • N, M tasks that must be scheduled in parallel
  • Bounded offer hoarding

Framework Pattern: Fleet of Services

  • N distinct coordinated services
  • Use a state machine
  • Bootstrap phase
  • Running phase

FrameworkInfo


message FrameworkInfo {
  required string user = 1;
  required string name = 2;
  optional FrameworkID id = 3;
  optional double failover_timeout = 4 [default = 0.0];
  optional bool checkpoint = 5 [default = false];
  optional string role = 6 [default = "*"];
  optional string hostname = 7;
  optional string principal = 8;
  optional string webui_url = 9;
}
        

Scheduler calls

Lifecycle management Register, Reregister, Unregister
Resource allocation Request, Decline, Revive
Task management Launch, Kill, Reconcile
Communication sendFrameworkMessage

Scheduler events

Lifecycle management Registered, Reregistered
Resource allocation Offers, Rescind
Task management TaskStatus
Communication frameworkMessage

Executor events

Lifecycle management Registered, Reregistered, Disconnected, Shutdown
Task management launchTask, killTask
Communication frameworkMessage

Executor calls

Task management sendStatusUpdate
Communication sendFrameworkMessage

Resource Offers


message Offer {
  required OfferID id
  required FrameworkID framework_id
  required SlaveID slave_id
  required string hostname
  repeated Resource resources
  repeated Attribute attributes
  repeated ExecutorID executor_ids
}
          

message Resource {
  required string name
  required Value.Type type
  optional Value.Scalar scalar
  optional Value.Ranges ranges
  optional Value.Set set
  optional string role [default = "*"];
  optional DiskInfo disk
}
          

Offers are consumed by
launching, declining, or accepting offers


class MyScheduler:
  ...
    def resourceOffers(self, driver, offers):
      ...
      driver.launchTasks(offers.ids, tasks, filters)
      ...
      driver.declineOffer(offer.id, filters)
      ...
      driver.acceptOffers(offers.ids, operations, filters)
    ...
    def myCustomLogic():
      ...
      driver.reviveOffers()
      ...
      driver.killTask(taskId)
          

Executor/TaskInfo describes
how to run a task


message TaskInfo {
  required string name
  required TaskID task_id
  required SlaveID slave_id
  repeated Resource resources
  optional ExecutorInfo executor
  optional CommandInfo command
  optional ContainerInfo container
  optional bytes data
  optional HealthCheck health_check
  optional Labels labels
  optional DiscoveryInfo discovery
}
        

message ExecutorInfo {
  required ExecutorID executor_id
  optional FrameworkID framework_id
  required CommandInfo command
  optional ContainerInfo container
  repeated Resource resources
  optional string name
  optional string source
  optional bytes data
  optional DiscoveryInfo discovery
}
        

CommandInfo & friends describe
what to run


message CommandInfo {
  repeated URI uris;
  optional Environment environment;
  optional string user;
  optional bool shell;
  optional string value;
  repeated string arguments;
  //deprecated ContainerInfo container;
}
message ContainerInfo {
  required Type type;
  repeated Volume volumes;
  optional DockerInfo docker;
}
        

message Volume {
  required string container_path;
  optional string host_path;
  required Mode mode; //RW,RO
}
message DockerInfo {
  required string image;
  optional Network network [default = HOST];
  repeated PortMapping port_mappings;
  optional bool privileged [default = false];
  optional bool force_pull_image;
  repeated Parameter parameters;
}
        

Handling Failures: Keeping the Scheduler up

  • Run your scheduler with a meta-framework
  • Re-register with the previous Framework ID
  • Pick up any persisted metadata
  • Perform task reconciliation
  • Continue as before

Framework Registration

Calls

  • Register
  • Re-register
  • Unregister

Events

  • Registered
  • Reregistered
  • Disconnected
  • Error

Handling Failures: Status Updates


class MyScheduler:
  ...
  def statusUpdate(self, driver, update):
    ...
    # Where update.state can be
    # TASK_STAGING = 6; // Initial state. Framework status updates should not use.
    # TASK_STARTING = 0;
    # TASK_RUNNING = 1;
    # TASK_FINISHED = 2; // TERMINAL. The task finished successfully.
    # TASK_FAILED = 3; // TERMINAL. The task failed to finish successfully.
    # TASK_KILLED = 4; // TERMINAL. The task was killed by the executor.
    # TASK_LOST = 5; // TERMINAL. The task failed but can be rescheduled.
    # TASK_ERROR = 7; // TERMINAL. The task description contains an error.
                

TASK_KILLED

  • Explicit (by slave or executor in respond to a scheduler's request
  • Implicit (together with the framework, e.g. on failover timeout or master exiting)

TASK_ERROR

  • New state
  • Malformed task (bad user, ID is not uniques, bad slave ID, etc.)

TASK_FAILED

  • Validation OK
  • Failure prior to start or during execution (OOM event in executor, non-zero exit status, etc.)

TASK_LOST

  • Validation OK
  • Failure prior to start or during execution (slave is disconnected at launch time, task is unknown during reconciliation, slave removed, executor is terminating, etc.)

TASK_FAILED vs. TASK_LOST

Re-launching a FAILED task will most probably fail, while LOST task may succeed.


            /** Describes the source of the task status update. */
            enum Source {
              SOURCE_MASTER = 0;
              SOURCE_SLAVE = 1;
              SOURCE_EXECUTOR = 2;
            }
             

                enum Reason {
                  REASON_COMMAND_EXECUTOR_FAILED = 0;
                  REASON_EXECUTOR_TERMINATED = 1;
                  REASON_EXECUTOR_UNREGISTERED = 2;
                  REASON_FRAMEWORK_REMOVED = 3;
                  REASON_GC_ERROR = 4;
                  REASON_INVALID_FRAMEWORKID = 5;
                  REASON_INVALID_OFFERS = 6;
                  REASON_MASTER_DISCONNECTED = 7;
                  REASON_MEMORY_LIMIT = 8;
                  REASON_RECONCILIATION = 9;
                  REASON_SLAVE_DISCONNECTED = 10;
                  REASON_SLAVE_REMOVED = 11;
                  REASON_SLAVE_RESTARTED = 12;
                  REASON_SLAVE_UNKNOWN = 13;
                  REASON_TASK_INVALID = 14;
                  REASON_TASK_UNAUTHORIZED = 15;
                  REASON_TASK_UNKNOWN = 16;
                }
              

Framework Messages

Delivery guarantee

  • At-least-once (status updates)
  • At-most-once (framework messages)

Framework to executor

  • Scheduler creates FrameworkToExecutorMessage
  • Scheduler determines how to send it: directly to the slave or via master
  • [opt] Master performs various checks (e.g. slave is connected and registered)
  • [opt] Master forwards the message to the slave
  • Slave performs various checks (e.g. framework is not terminating and executor is running)
  • Slave forwards the message to executor

Executor to framework

  • Executor creates ExecutorToFrameworkMessage
  • Executor sends it to the slave
  • Slave performs various checks (framework is terminating, slave is not in RUNNING state, etc)
  • Slave forwards the message to framework (scheduler)
  • Scheduler processes the message

Task Reconciliation


               let start = now()
               let remaining = { T ϵ tasks | T is non-terminal }
            3: Perform reconciliation: reconcile(remaining)
               Wait for status updates to arrive (truncated exp backoff).
                 For each update, note the time of arrival.
               let remaining = { T ϵ remaining | T.last_update_arrival() < start }
               If remaining is non-empty, go to 3.
          

Framework Authentication


MesosSchedulerDriver(Scheduler*, FrameworkInfo, string master, Credential)
              

message Credential {
  required string principal = 1;
  optional bytes secret = 2;
}
              

Authorization and ACLs


message FrameworkInfo {
  ...
  optional string principal = 8;
  ...
}
              

mesos-master ... --acls="{
  "permissive" : false,
  "register_frameworks": [
    {
      "principals": { "values": ["HDFS"] },
      "roles": { "values": ["storage"] }
    }, {
      "principals": { "type": "ANY" },
      "roles": { "values": ["*"] }
    }
  ],
  "run_tasks": [
    {
      "principals": { "values": ["Marathon", "Chronos"] },
      "users": { "values": ["root"] }
    }
  ]}"
              

DCOS Integration

  • Able to launch Scheduler via Marathon (Dockerized?)
  • Publish package/config/marathon json to Mesosphere Universe
  • Mesos-DNS compatibility: No spaces in framework/task names
  • DCOS CLI integration: scale up/down, config, etc.

Mesos 1.0 HTTP API

  • /events endpoint
    • Schedulers POST registration data here
    • Expected to keep a persistent connection open
    • Replaces scheduler callback methods
    • Chunked-Transfer encoding, one chunk per update
  • /call endpoint
    • Replaces scheduler driver methods

Additional Advice

  • No blocking in sched/exec callbacks
  • SchedulerDriver* will not change, included as convenience
  • TaskIDs must be unique within the same frameworkId
  • No state in sandbox
  • No implicit deps outside sandbox, node constraints
  • Exec/task binaries/data via HDFS/etc, no public internet
  • Allow co-tenancy of multiple execs on same machine; No static ports
  • Know and publish (& verify?) Mesos version compatibility
  • Don't hardcode params that should be configurable
  • Pure bindings? Check out mesos-go, pesos, jesos