Big data pipeline processing using Amazon ECS

This is the first post of a series of two articles in which I will demonstrate how to use Amazon Elastic Container Service (aka ECS) to create a big data processing pipeline capable of responding autonomously to load changes. While this first part will show generically how to create a cluster with the use of on-demand instances and, in part, also spots, in the next article I will instead explain how to exclusively use the spot for EC2 machines and the tools that we had to create to manage the increased management complexity.

In the project, from which I took inspiration for these articles, our goal was to process approximately 7 TB of data per day by reading them from various Amazon Kinesis streams spread over 5 AWS regions , compressing the data into small zstd packages to be loaded on an S3 bucket . The system we designed had to replace an existing version in Apache Storm that had various stability problems and, in doing so, we were asked for substantial cost savings.

We first replaced the application with one written in Go, which immediately allowed us to be able to significantly decrease the number and size of the servers needed. We also decided to use ECS to provide the service, running the Go applications in Docker containers and leaving room for manoeuvre to increase or decrease the servers as the workload changed. Furthermore, we built not just the one cluster but a cluster in each region from which we had to read the data. In this way we were able to compress the data directly in the source region, saving a lot of bandwidth in uploading the files to the destination S3 bucket (which is in Oregon). Just think that Zstd is able to compress our data between 20 and 30 times and that, for example, the cost of transferring data from AWS Tokyo to another region is 0.09 $/GB. Performing two calculations this allowed us to go from an average data transfer cost of $ 180/day to around $ 9/day…

Last but not least was the exclusive use of spot instances instead of on-demand instances. I will dedicate the next blog post entirely to this theme, but I anticipate that it has allowed us to save around 65%, not bad 🙂

Let’s create an ECS cluster together

To facilitate AWS configurations, we used  Terraform, a HashiCorp product for configuring cloud infrastructures through code. It is an excellent DevOps tool to apply reproducible configurations to the AWS infrastructure without getting lost in the CLI, the thousand web-UIs and other always very heterogeneous tools. I state that I will not show all the necessary configurations, but a summary of the same in order to avoid digressing excessively from the topic of this post. In particular, I will not be addressing the configurations of the IAM permissions, without which it is not possible to apply the configurations shown. I plan to return to the subject in the future with another article.

A brief introduction to EC2 instance types

Amazon Elastic Compute Cloud (Amazon EC2) is the AWS tool to provide computing capacity in the cloud. This capacity is provided by machines with very different sizes (from 1 vCPU and 2 GB of RAM up to hundreds of vCPU and tens of TB of RAM), as well as with a variety of prices. Each type of machine (or instance) can be of two different types: reserved or on-demand. On-demand instances provide great dynamism as they can be turned on and off on request (even for a few minutes), but they are expensive. Reserved instances, instead, offer cheaper prices but must be purchased for 1 or 3 years. There is a third option, which I mentioned in the introduction, namely the spot market, but we will explore the topic in greater depth in the next post.

The ECS cluster

Amazon Elastic Container Service, ECS is a product of the AWS family that is used to orchestrate Docker container clusters, obviously integrated with the Amazon Web Services ecosystem. It is an excellent solution to manage dynamic workloads, whether it is a product aimed at the web public or, as in our case, a high-performance calculation pipeline, as it is immediately possible to modify the resources in use to adapt to the current load. The containers can be run on Fargate (which we will not be addressing now), a system that is used to avoid having to deal with the machines on which the containers are run, or on EC2 machines, which are reserved, on-demand or spot. At operational level, an ECS cluster groups a set of services, in turn consisting of one or more tasks, that is, instances of Docker containers.

To create an example cluster with a service and a task that runs two identical Docker containers, this is the necessary Terraform configuration:

resource "aws_ecs_cluster" "my_cluster" {
  name = "my_cluster"
}

resource "aws_ecs_task_definition" "my_task" {
  family                = "task_name"
  container_definitions = data.template_file.my_task_definition.rendered
}

resource "aws_ecs_service" "my_service" {
  name            = "service_name"
  cluster         = aws_ecs_cluster.my_cluster.id
  task_definition = aws_ecs_task_definition.my_task.arn
  desired_count   = 2
  launch_type     = "EC2"
  ordered_placement_strategy {
    type  = "spread"
    field = "instanceId"
  }
}

There are a few things to note in the following few lines:

data "template_file" "my_task_definition" {
  template = "${file("${path.module}/task-definition.json")}"
  vars = {
    task_name         = "my_task"
    image             = "<Docker image in ECR>"
    memoryReservation = 128
    cpu               = 9
  }
}

The variables passed to the definition can be used inside the task definition JSON file:

[
    {
      "essential": true,
      "cpu": ${cpu},
      "memoryReservation": ${memoryReservation},
      "image": "${image}",
      "name": "${task_name}"
    }
  ]

There are many options for the task definition but management of the CPU and of the memory are particularly important and not trivial, requiring calculation. Personally I used the strategy suggested by this article, that is: start with values that seem sensible, then adjust.

After this configuration, the ECS cluster is ready (*) to run the configured services.  Simply “hook” one or more EC2 instances and the Docker containers will automatically be launched.

(*) As stated beforehand, I am showing configuration extracts.  To make them functional it will be necessary to add other sections

The configuration of EC2 instances

Launch template

The launch template represents a set of configurations to be used when it is decided to run an EC2 instance in order to avoid having to specify them all upon launch:

resource "aws_launch_template" "my_launch_template" {
  name      = "launch_template_name"
  image_id  = "<ami-id>"
  user_data = base64encode(data.template_file.userdata.rendered)
  ...
}

data "template_file" "userdata" {
  template = "${file("${path.module}/user_data.sh")}"
  vars = {
    cluster_name = "my_cluster"
  }
}

I reduced the file to the essential configurations, although there are actually many available. I recommend careful reading of the official documentation.

In addition to the name, I reported the parameter <image_id> which defines which AMI image to use to launch instances and user_data , a script (bash in my case) that will be executed when the instances start.

As for the AMI, obviously it is possible to use one of those provided by Amazon or by third parties.  Initially, it was also our choice but after a while the script in user_data had become very long and consequently the start of the instances was also slow. We therefore decided to create a custom version of a basic AMI with Packer by moving most of the start-up script instructions at that stage.

I will discuss Packer in a future post.  For the moment here is an essential user_data script:

#!/bin/bash
set -eu

# Configure ECS
mkdir -p /etc/ecs
echo ECS_CLUSTER=${cluster_name} >> /etc/ecs/ecs.config
echo ECS_CONTAINER_STOP_TIMEOUT=120s >> /etc/ecs/ecs.config
echo ECS_ENABLE_SPOT_INSTANCE_DRAINING=true >> /etc/ecs/ecs.config

# Install ECS stuff
yum install -y ecs-init

# Add ssh user to docker group
usermod -a -G docker ec2-user
service docker restart
start ecs
sleep 90

# Update ECS agent image
docker stop ecs-agent
docker rm ecs-agent
docker pull amazon/amazon-ecs-agent:latest
start ecs

# More stuff ...

The file configures the machine to join the ECS cluster using the variable cluster_name which was passed to it by the Terraform template to connect into the correct cluster. The two subsequent configurations ECS_CONTAINER_STOP_TIMEOUT and ECS_ENABLE_SPOT_INSTANCE_DRAINING are useful for correct operation of the cluster with spot instances. I will return to these configurations in the next blog post. There are many other configurations that can be inserted in the /etc/ecs/ecs.config file.  For this I refer to the official guide.

EC2 Fleet

EC2 fleet is a way to define a fleet of instances (reserved, on-demand or spot) that use a launch template, deciding which types of instances to use, the availability zones, etc.

Below is a relatively simple configuration that uses mixed spot and on-demand instances. Given the length of the file I will add the comments directly into the file:

resource "aws_ec2_fleet" "my_ec2_fleet" {
  # This is an internal terraform command that defines how to manage the resource. In this case,
  # when there is a change that requires deletion and recreation of the fleet, the
  # new version is created and then the old one is deleted. For the fleet this is very useful as
  # deletion takes several minutes. Creating the new resource first avoids unnecessary
  # downtime of the service.

  lifecycle {
    create_before_destroy = true
  }

  terminate_instances_with_expiration = false
  terminate_instances                 = true
  replace_unhealthy_instances         = true
  type                                = "maintain"

  launch_template_config {
    # Let's use the launch template defined before, in its latest version
    launch_template_specification {
      launch_template_id = aws_launch_template.my_launch_template.id
      version            = aws_launch_template.my_launch_template.latest_version
    }
    # I define overrides with respect to the launch template. I choose 3 different types of instances
    # giving each a weight (based on the resources provided) and a priority. For some I also define
    # a maximum price that I intend to spend on the spot market. Where the price is not defined
    # it may at most be equivalent to the on-demand instance of the same type
    override {
      instance_type     = "c4.large"
      weighted_capacity = 1
      priority          = 2
    }

    override {
      instance_type     = "c4.xlarge"
      weighted_capacity = 2
      priority          = 1
      max_price         = 0.10
    }

    override {
      instance_type     = "c4.2xlarge"
      weighted_capacity = 4
      priority          = 0
      max_price         = 0.20
    }
  }

  # For on-demand instances I choose to use the lower cost ones, for the spot ones instead
  # I prefer to diversify, to minimise the number of outages

  on_demand_options {
    allocation_strategy = "lowestPrice"
  }
  spot_options {
    allocation_strategy = "diversified"
    instance_interruption_behavior = "terminate"
  }

  # I define how to distribute the load between spots and on-demand
  target_capacity_specification {
    default_target_capacity_type = "spot"
    total_target_capacity     = 3
    on_demand_target_capacity = 1
    spot_target_capacity      = 2
  }
}

In this configuration it is important to evaluate the weight of the requests and the cost management well, so I recommend reading two resources: the official guide and an interesting blog post.

As mentioned, this configuration is a mixture of spot and on-demand.  In the next article I will explain how to manage these values to use only spot instances.

Autoscaling group (aka ASG)

The autoscaling function is used to increase the resources available in the event that consumption increases. In our case we will scale the number of instances available to the cluster without changing the tasks in the service. This aspect depends very much on the type of application being created. In the case of a web application, it may also make sense to scale on tasks in order to lighten the load on web servers that are serving an excessive number of users. In our case, which is slightly special, there was a need to distribute a constant number of containers on the machines. In the next paragraph I will go into detail about this aspect and the solutions we applied.

Returning to the autoscaling group, we can start by defining the group and the notification triggers:

resource "aws_autoscaling_group" "my_asg" {
  name     = "my_asg"
  max_size = 4
  min_size = 0
  termination_policies = ["OldestInstance", "OldestLaunchTemplate"]
  default_cooldown = 900 # 15 min
  mixed_instances_policy {
    launch_template {
      launch_template_specification {
        launch_template_id = aws_launch_template.my_launch_template.id
        version            = "$Latest"
      }
      override {
        instance_type = "m5.xlarge"
      }
      override {
        instance_type = "c5.xlarge"
      }
      ...
    }
    instances_distribution {
      on_demand_base_capacity                  = 0
      on_demand_percentage_above_base_capacity = 0
      spot_allocation_strategy                 = "lowest-price"
      spot_instance_pools                      = 4
    }
  }
}

In this configuration section, the autoscaling group is defined, indicating the desired minimum and maximum number of instances, the launch template to be used and the preferred types of instance. I have indicated two types but as it is an exclusive spot market (note the configuration of instances_distribution ), I suggest inserting a wider list in order to avoid not having any instances available. The only note is to use instances with compatible resources otherwise the application could significantly change its behaviour (for example by halving or doubling a CPU or memory). default_cooldown is set at a fairly high value, 15 minutes: this parameter defines how much time must elapse between one autoscaling action and another. As our application takes some time to align to a semi-constant value, with this high number a situation is avoided where the ASG starts and then stops instances before the consumption of resources settles down. There is no need to dwell on termination_policies now, a very important configuration that will be addressed in detail in the paragraph “Replacing an instance without downtime”.

After the basic setup of autoscaling, we configure the scaling policies:

resource "aws_autoscaling_policy" "my_asg_autoscale_up" {
  name                   = "my-asg-autoscale-up"
  autoscaling_group_name = aws_autoscaling_group.my_asg.name
  adjustment_type        = "ChangeInCapacity"
  policy_type            = "StepScaling"

  estimated_instance_warmup = 420 # 7 minutes
  metric_aggregation_type   = "Average"

  step_adjustment {
    scaling_adjustment          = 1
    metric_interval_lower_bound = 0.0
    metric_interval_upper_bound = 10.0
  }

  step_adjustment {
    scaling_adjustment          = 2
    metric_interval_lower_bound = 10.0
  }
}

resource "aws_autoscaling_policy" "my_asg_autoscale_down" {
  name                   = "my-asg-autoscale-down"
  autoscaling_group_name = aws_autoscaling_group.my_asg.name
  adjustment_type        = "ChangeInCapacity"
  policy_type            = "StepScaling"

  estimated_instance_warmup = 600 # 10 minutes
  metric_aggregation_type   = "Average"

  step_adjustment {
    scaling_adjustment          = -1
    metric_interval_lower_bound = -20.0
    metric_interval_upper_bound = 0.0
  }

  step_adjustment {
    scaling_adjustment          = -2
    metric_interval_upper_bound = -20.0
  }
}

These policies are used to define the conditions for positive or negative scaling of the number of instances (ChangeInCapacity). With the different step_adjustment values, whose  bounds are calculated as the difference with respect to the CloudWatch metrics (defined below), it is possible to scale one or even two instances in case of high or reduced consumptions.

Last step, we define the CloudWatch metric:

resource "aws_cloudwatch_metric_alarm" "my_asg_metric_autoscale_up" {
  alarm_name          = "my-asg-scale-up"
  comparison_operator = "GreaterThanThreshold"
  metric_name         = "CPUUtilization"
  namespace           = "AWS/ECS"
  period              = "300"
  evaluation_periods  = "3"
  statistic           = "Maximum"
  threshold           = 80

  dimensions = {
    ClusterName = "${aws_ecs_cluster.my_cluster.name}"
  }
  alarm_description = "Monitor CPU utilization"
  alarm_actions     = [aws_autoscaling_policy.my_asg_autoscale_up.arn]
}

With this configuration we employ 80% of CPU use as a limit, therefore scaling by one instance if it is between 80% and 90% for 5 minutes (period = 300) or even two if it is between 90% and 100%. Similar configurations can also be used to decrease the number of instances and also to monitor memory use in addition to the CPU. In this case ensure evaluation of the interaction between CPU and memory well otherwise the result could be a tug of war between requesting to climb up for one metric and down for another, with unpredictable results. In our case, having a relatively stable memory use and a more “unstable” CPU we decided to scale up for both metrics but only use the CPU for down scalability.

The next steps

At this point you are able (except for the IAM roles) to configure an ECS cluster to run customised processes in Docker containers, adapting the cluster’s computing capacity to current needs. As anticipated in the introduction, in our project we set out to take a step forward, deciding to use only spot instances.  This saved us a great deal (about 65%) but introduced a certain complexity of management, which we had to address. The next blog post is entirely dedicated to this topic, so see you soon!

Read the article Amazon ECS: exclusive use of spot markets reduced our costs by 65%