Automation with Python
Introduction to Boto Library: AWS SDK for Python
- We used Terraform to set up our AWS infrastructure
- What about automating maintenance tasks?
- Doing regular back-ups
- Doing regular clean-ups
- Configurations on existing servers
- Doing health checks / monitoring
- Boto makes it possible to make use of AWS resources
- Python can do things we did with Terraform
- Even though Python can also do infrastructure provisioning as well it is important to know which tool to use for which tasks
Install Boto3 And Connect to AWS
pip install boto3
- Boto uses your default aws credentials in the .aws directory for authentication
.aws/credentials.aws/config
Getting familiar with Boto3
Get Cidr Block State
import boto3
ec2_client = boto3.client('ec2')
all_available_vpcs = ec2_client.describe_vpcs()
vpcs = all_available_vpcs["Vpcs"]
for vpc in vpcs:
print(vpc["VpcId"])
cidr_block_assoc_sets = vpc["CidrBlockAssociationSet"]
for assoc_set in cidr_block_assoc_sets:
print(assoc_set["CidrBlockState"])
Connect to Non-Default Region
- Can use a named parameter
import boto3
ec2_client = boto3.client('ec2', region_name="us-east-2")
all_available_vpcs = ec2_client.describe_vpcs()
vpcs = all_available_vpcs["Vpcs"]
Create VPC and Subnets
- client vs resource
- Client is more low-level. It provides one-to-one mapping to underlying HTTP API operations
- Resource provides resource objects to access attributes and perform actions. It's high level and object-oriented.
- Think of it as wrapping the client under the hood
- Resource gives us an object that we can use to make subsequent calls to that resource
main.py
import boto3
ec2_client = boto3.client('ec2', region_name="us-east-2")
ec2_resource = boto3.resource('ec2', region_name="us-east-2")
new_vpc = ec2_resource.create_vpc(
CidrBlock="10.0.0.0/16"
)
new_vpc.create_tags(
Tags=[
{
'Key': 'Name',
'Value': 'my-vpc'
}
]
)
new_vpc.create_subnet(
CidrBlock="10.0.1.0/24"
)
new_vpc.create_subnet(
CidrBlock="10.0.2.0/24"
)
all_available_vpcs = ec2_client.describe_vpcs()
vpcs = all_available_vpcs["Vpcs"]
for vpc in vpcs:
print(vpc["VpcId"])
cidr_block_assoc_sets = vpc["CidrBlockAssociationSet"]
for assoc_set in cidr_block_assoc_sets:
print(assoc_set["CidrBlockState"])
Terraform vs Python
-
Terraform manages state of infrastructure
- TF knows the current state
- TF knows the difference of current state and your configured / desired state
- TF is idempotent
- Declare end result
- Easier to write
-
Python
- Python does not have state
- Is not idempotent
- When removing resources, you have to explicitly delete them
-
TF is much better for infrastructure provisioning
-
Use cases for Boto3
- Can do way more things because of its low-level API
- More complex logic possible
- Boto is a full-fledged AWS library
- Monitoring
- BackUps
- Schedules tasks etc
- Can add a Web interface
-
Know when to use each tool
EC2 server status check
- Scenario
- Created hundreds of EC2 servers with Terraform
- Configured autoscaling
- Instances get created/deleted all the time
- New instances always need some time to initialize
- We need to know the state of our instances
- First Step: Create 3 EC2 Instances with TF
Create EC2 Instances with TF
main.tf
provider "aws" {
region = "us-east-1"
}
variable "vpc_cidr_block" {}
variable "subnet_cidr_block" {}
variable "avail_zone" {}
variable "env_prefix" {}
variable "my_ip" {}
variable "instance_type" {}
variable "public_key" {}
variable "public_key_location" {}
variable "image_name" {}
# VPC
resource "aws_vpc" "myapp-vpc" {
cidr_block = var.vpc_cidr_block
tags = {
Name: "${var.env_prefix}-vpc",
}
}
# Subnet
resource "aws_subnet" "myapp-subnet-1" {
vpc_id = aws_vpc.myapp-vpc.id
cidr_block = var.subnet_cidr_block
availability_zone = var.avail_zone
tags = {
Name: "${var.env_prefix}-subnet-1"
}
}
# Internet gateway
resource "aws_internet_gateway" "myapp-igw" {
vpc_id = aws_vpc.myapp-vpc.id
tags = {
Name: "${var.env_prefix}-igw"
}
}
# Default route table
resource "aws_default_route_table" "main-rtb" {
default_route_table_id = aws_vpc.myapp-vpc.default_route_table_id
route {
cidr_block = "0.0.0.0/0"
gateway_id = aws_internet_gateway.myapp-igw.id
}
tags = {
Name: "${var.env_prefix}-main-rtb"
}
}
# Security Group
resource "aws_default_security_group" "myapp-default-sg" {
vpc_id = aws_vpc.myapp-vpc.id
ingress {
from_port = 22
protocol = "tcp"
to_port = 22
cidr_blocks = [var.my_ip]
}
ingress {
from_port = 8080
protocol = "tcp"
to_port = 8080
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
protocol = "-1"
to_port = 0
cidr_blocks = ["0.0.0.0/0"]
prefix_list_ids = []
}
tags = {
Name: "${var.env_prefix}-default-sg"
}
}
# Data for AWS AMI
data "aws_ami" "latest-amazon-linux-image" {
most_recent = true
owners = ["amazon"]
filter {
name = "name"
values = [var.image_name]
}
filter {
name = "virtualization-type"
values = ["hvm"]
}
}
# Output ami id
output "aws_ami_id" {
value = data.aws_ami.latest-amazon-linux-image.id
}
# Output EC2 public IP
output "ec2_public_ip" {
value = aws_instance.myapp-server.public_ip
}
# SSH Key
resource "aws_key_pair" "ssh-key" {
key_name = "ec2-server-key"
public_key = file(var.public_key_location)
# public_key = var.public_key
}
# EC2 instance
resource "aws_instance" "myapp-server" {
ami = data.aws_ami.latest-amazon-linux-image.id
instance_type = var.instance_type
subnet_id = aws_subnet.myapp-subnet-1.id
vpc_security_group_ids = [aws_default_security_group.myapp-default-sg.id]
availability_zone = var.avail_zone
associate_public_ip_address = true
key_name = aws_key_pair.ssh-key.key_name
user_data = file("entry-script.sh")
tags = {
Name = "${var.env_prefix}-server"
}
}
resource "aws_instance" "myapp-server-two" {
ami = data.aws_ami.latest-amazon-linux-image.id
instance_type = var.instance_type
subnet_id = aws_subnet.myapp-subnet-1.id
vpc_security_group_ids = [aws_default_security_group.myapp-default-sg.id]
availability_zone = var.avail_zone
associate_public_ip_address = true
key_name = aws_key_pair.ssh-key.key_name
user_data = file("entry-script.sh")
tags = {
Name = "${var.env_prefix}-server-two"
}
}
resource "aws_instance" "myapp-server-three" {
ami = data.aws_ami.latest-amazon-linux-image.id
instance_type = var.instance_type
subnet_id = aws_subnet.myapp-subnet-1.id
vpc_security_group_ids = [aws_default_security_group.myapp-default-sg.id]
availability_zone = var.avail_zone
associate_public_ip_address = true
key_name = aws_key_pair.ssh-key.key_name
user_data = file("entry-script.sh")
tags = {
Name = "${var.env_prefix}-server-three"
}
}
terraform.tfvars
vpc_cidr_block = "10.0.0.0/16"
subnet_cidr_block = "10.0.10.0/24"
avail_zone = "us-east-1a"
env_prefix = "dev"
my_ip = "197.251.184.147/32"
instance_type = "t3.micro"
public_key = ""
public_key_location = "/Users/alfredamoah/.ssh/id_rsa.pub"
image_name = "amzn2-ami-kernel-*-x86_64-gp2"
Print EC2 Instance State
main.py
import boto3
ec2_client = boto3.client('ec2', region_name="us-east-1")
ec2_resource = boto3.resource('ec2', region_name="us-east-1")
def check_instance_status():
statuses = ec2_client.describe_instance_status(
IncludeAllInstances=True
)
for status in statuses['InstanceStatuses']:
ins_status = status['InstanceStatus']['Status']
sys_status = status['SystemStatus']['Status']
state = status['InstanceState']['Name']
print(f"Instance {status['InstanceId']} is {state} with instance status {ins_status} and system status {sys_status}")
print("#############################\n")
check_instance_status()
Write a scheduled task in Python
Scheduling Status Checks
- Scheduler that triggers program automatically
pip install schedule
main.py
import boto3
import schedule
ec2_client = boto3.client('ec2', region_name="us-east-1")
ec2_resource = boto3.resource('ec2', region_name="us-east-1")
def check_instance_status():
statuses = ec2_client.describe_instance_status(
IncludeAllInstances=True
)
for status in statuses['InstanceStatuses']:
ins_status = status['InstanceStatus']['Status']
sys_status = status['SystemStatus']['Status']
state = status['InstanceState']['Name']
print(f"Instance {status['InstanceId']} is {state} with instance status {ins_status} and system status {sys_status}")
print("#############################\n")
schedule.every(5).seconds.do(check_instance_status)
# schedule.every().day.at("1:00")
# schedule.every().monday.at("13:00")
while True:
schedule.run_pending()
Configure server: Add environment tags to EC2 instances
- Scenario
- We have 20 prod servers in Paris
- We have 10 dev servers in Frankfurt
- We want to add tags to all server
main.py
import boto3
ec2_client_paris = boto3.client('ec2', region_name="eu-west-3")
ec2_resource_paris = boto3.resource('ec2', region_name="eu-west-3")
ec2_client_frankfurt = boto3.client('ec2', region_name="eu-central-1")
ec2_resource_frankfurt = boto3.resource('ec2', region_name="eu-central-1")
instance_ids_paris = []
instance_ids_frankfurt = []
reservations_paris = ec2_client_paris.describe_instances()['Reservations']
for res in reservations_paris:
instances = res['Instances']
for ins in instances:
instance_ids_paris.append(ins['InstanceId'])
response = ec2_resource_paris.create_tags(
Resources=instance_ids_paris,
Tags=[
{
'Key': 'environment',
'Value': 'prod'
},
]
)
reservations_frankfurt = ec2_client_frankfurt.describe_instances()['Reservations']
for res in reservations_frankfurt:
instances = res['Instances']
for ins in instances:
instance_ids_frankfurt.append(ins['InstanceId'])
response = ec2_resource_frankfurt.create_tags(
Resources=instance_ids_frankfurt,
Tags=[
{
'Key': 'environment',
'Value': 'dev'
},
]
)
EKS Cluster Information
- Scenario
- We have 10 EKS clusters
- We want an overview of all running clusters
- Which K8s version each cluster is running
- Cluster endpoints
main.py
import boto3
client = boto3.client('eks', region_name="eu-west-3")
clusters = client.list_clusters()['clusters']
for cluster in clusters:
response = client.describe_cluster(
name=cluster
)
cluster_info = response['cluster']
cluster_status = cluster_info['status']
cluster_endpoint = cluster_info['endpoint']
cluster_version = cluster_info['version']
print(f"Cluster {cluster} status is {cluster_status}")
print(f"Cluster endpoint: {cluster_endpoint}")
print(f"Cluster version: {cluster_version}")
Backup EC2 volumes: Automate creating snapshots
- Automates backups for EC2 instances
- Volumes are AWS storage components that stores the EC2 instance data
- Every instance has its own volume
- Kind of like the hard drive for the instance
- When we delete the instance, the volume gets deleted
- Volume snapshot: copy of Volume
- Scenario
- We have 50 EC2 volumes that have not been backed up
- We want to do daily backups
main.py
import boto3
import schedule
ec2_client = boto3.client('ec2', region_name="us-east-1")
def create_volume_snapshots():
volumes = ec2_client.describe_volumes(
Filters=[
{
"Name": "tag:Name",
"Values": ["prod"]
}
]
)
for volume in volumes['Volumes']:
new_snapshot = ec2_client.create_snapshot(
VolumeId=volume['VolumeId']
)
print(new_snapshot)
schedule.every().day.do(create_volume_snapshots)
while True:
schedule.run_pending()
Automate cleanup of snapshots
- We only need one or two of the latest snapshots
main.py
import boto3
from operator import itemgetter
ec2_client = boto3.client('ec2', region_name="us-east-1")
volumes = ec2_client.describe_volumes(
Filters=[
{
"Name": "tag:Name",
"Values": ["dev-server"]
}
]
)
for volume in volumes['Volumes']:
snapshots = ec2_client.describe_snapshots(
OwnerIds=['self'],
Filters=[
{
"Name": "volume-id",
"Values": [volume["VolumeId"]]
}
]
)
sorted_by_date = sorted(snapshots['Snapshots'], key=itemgetter('StartTime'), reverse=True)
# Maintain last 2 snapshots
for snap in sorted_by_date[2:]:
response = ec2_client.delete_snapshot(
SnapshotId=snap['SnapshotId']
)
print(response)
Automate restoring EC2 volume from backup
- Scenario
- We want to recover the latest working state of the EC2 instance
- Create new volume from the snapshot
- Attach new volume to EC2 instance
main.py
import boto3
from operator import itemgetter
ec2_client = boto3.client('ec2', region_name="us-east-1")
ec2_resource = boto3.resource('ec2', region_name="us-east-1")
instance_id = "i-071d0936ff8344f29"
# Get instance volumes
volumes = ec2_client.describe_volumes(
Filters=[
{
"Name": "attachment.instance-id",
"Values": [instance_id]
}
]
)
instance_volume = volumes['Volumes'][0]
# Get instance volume snapshots
snapshots = ec2_client.describe_snapshots(
OwnerIds=['self'],
Filters=[
{
"Name": "volume-id",
"Values": [instance_volume['VolumeId']]
}
]
)
latest_snapshot = sorted(snapshots['Snapshots'], key=itemgetter('StartTime'), reverse=True)[0]
print(latest_snapshot['StartTime'])
new_volume = ec2_client.create_volume(
SnapshotId=latest_snapshot['SnapshotId'],
AvailabilityZone="us-east-1a",
TagSpecifications=[
{
'ResourceType': 'volume',
'Tags': [
{
'Key': 'Name',
'Value': 'prod'
}
]
}
]
)
while True:
vol = ec2_resource.Volume(new_volume['VolumeId'])
print(vol.state)
if vol.state == 'available':
ec2_resource.Instance(instance_id).attach_volume(
VolumeId=new_volume['VolumeId'],
Device='/dev/xvdb'
)
break
Handling Error
- Errors can occur when interacting with AWS resources
- Write code that handles errors
Terraform- Can handle rollbacks itself because it tracks state
- In Python, you do it yourself
- Use
try...except
for volume in volumes['Volumes']:
try:
new_snapshot = ec2_client.create_snapshot(
VolumeId=volume['VolumeId']
)
print(new_snapshot)
except:
print("An error occurred")
Website monitoring 1: Scheduled Task to Monitor App Health
- We can automate other things that have nothing to do with AWS
- We will monitor the website and fix the problem
- Preparation Steps
- Create a Server on Linode Cloud Platform
- Install Docker on server
- Run nginx container
- Write Automation Program
- Write Python program that checks application
- Send email when website is down
- Automate fixing the problem
- Restart Docker container
- Restart Server
Step 1
- Create linode server
- Install docker
- Run nginx
Step 2: Website Request
import requests
response = requests.get("http://172-104-226-116.ip.linodeusercontent.com:8080")
if response.status_code == 200:
print("Application is running")
else:
print("Application Down. Fix it")
Website monitoring 2: Automated Email Notification
with statement
-
Alternative to try/finally statements
-
Used in exception handling and clean up code to make code cleaner
-
Used with unmanaged resources (eg. file handling)
-
Need to allow your gmail to allow less secure apps to login
- Less Secure Apps
- Won't work if you have 2FA
-
Add app password
-
Can add env in PyCharm
import requests
import smtplib
import os
EMAIL_ADDRESS = os.environ.get('EMAIL_ADDRESS')
EMAIL_PASSWORD = os.environ.get('EMAIL_PASSWORD')
def send_notification(email_msg):
print('Sending an email...')
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.ehlo()
smtp.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
message = f"Subject: SITE DOWN\n{email_msg}"
smtp.sendmail(EMAIL_ADDRESS, EMAIL_ADDRESS, message)
response = requests.get("http://172-104-226-116.ip.linodeusercontent.com:8080")
if response.status_code == 200:
print("Application is running")
else:
print("Application Down. Fix it")
msg = f'Application returned {response.status_code}'
send_notification(msg)
Handle Connection Error
- What if the server does not even send a response
import requests
import smtplib
import os
EMAIL_ADDRESS = os.environ.get('EMAIL_ADDRESS')
EMAIL_PASSWORD = os.environ.get('EMAIL_PASSWORD')
def send_notification(email_msg):
print('Sending an email...')
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.ehlo()
smtp.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
message = f"Subject: SITE DOWN\n{email_msg}"
smtp.sendmail(EMAIL_ADDRESS, EMAIL_ADDRESS, message)
def monitor_application():
try:
response = requests.get("http://172-104-226-116.ip.linodeusercontent.com:8080")
if response.status_code == 200:
print("Application is running")
else:
print("Application Down. Fix it")
msg = f'Application returned {response.status_code}'
send_notification(msg)
except Exception as ex:
print(f'Connection error happened: {ex}')
msg = 'Application not accessible at all'
send_notification(msg)
monitor_application()
Website monitoring 3: Restart application and Reboot Server
Restart The Application
- Connect to Linode server
- Restart container
pip install paramiko
paramiko- Python implementation of SSHv2
- Library for making SSH connections (client or server)
main.py
import requests
import smtplib
import os
import paramiko
EMAIL_ADDRESS = os.environ.get('EMAIL_ADDRESS')
EMAIL_PASSWORD = os.environ.get('EMAIL_PASSWORD')
def send_notification(email_msg):
print('Sending an email...')
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.ehlo()
smtp.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
message = f"Subject: SITE DOWN\n{email_msg}"
smtp.sendmail(EMAIL_ADDRESS, EMAIL_ADDRESS, message)
def restart_container():
print('Restarting the application...')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Confirm to add host key to server to allow connection
ssh.connect(hostname='172.104.226.116', username='root', key_filename='/Users/alfredamoah/.ssh/id_rsa')
stdin, stdout, stderr = ssh.exec_command('docker start 558d2a4099a0')
print(stdout.readlines())
ssh.close()
def monitor_application():
try:
response = requests.get("http://172-104-226-116.ip.linodeusercontent.com:8080")
if response.status_code == 200:
print("Application is running")
else:
print("Application Down. Fix it")
msg = f'Application returned {response.status_code}'
send_notification(msg)
restart_container()
except Exception as ex:
print(f'Connection error happened: {ex}')
msg = 'Application not accessible at all'
send_notification(msg)
monitor_application()
Reboot The Server
- We'll use Python's Linode library
pip install linode_api4
- Need Linode Token
- Create API Access Token
- Need Linode ID as well
Reboot server and schedule tasks
import requests
import smtplib
import os
import paramiko
import linode_api4
import time
import schedule
EMAIL_ADDRESS = os.environ.get('EMAIL_ADDRESS')
EMAIL_PASSWORD = os.environ.get('EMAIL_PASSWORD')
LINODE_TOKEN = os.environ.get('LINODE_TOKEN')
def send_notification(email_msg):
print('Sending an email...')
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.ehlo()
smtp.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
message = f"Subject: SITE DOWN\n{email_msg}"
smtp.sendmail(EMAIL_ADDRESS, EMAIL_ADDRESS, message)
def restart_container():
print('Restarting the application...')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Confirm to add host key to server to allow connection
ssh.connect(hostname='172.104.226.116', username='root', key_filename='/Users/alfredamoah/.ssh/id_rsa')
stdin, stdout, stderr = ssh.exec_command('docker start 558d2a4099a0')
print(stdout.readlines())
ssh.close()
def restart_server_and_container():
# restart linode server
print('Rebooting the server...')
client = linode_api4.LinodeClient(LINODE_TOKEN)
nginx_server = client.load(linode_api4.Instance, 45162501)
nginx_server.reboot()
# restart the application
while True:
nginx_server = client.load(linode_api4.Instance, 45162501)
if nginx_server.status == 'running':
time.sleep(5)
restart_container()
break
def monitor_application():
try:
response = requests.get("http://172-104-226-116.ip.linodeusercontent.com:8080")
if response.status_code == 200:
print("Application is running")
else:
print("Application Down. Fix it")
msg = f'Application returned {response.status_code}'
send_notification(msg)
restart_container()
except Exception as ex:
print(f'Connection error happened: {ex}')
msg = 'Application not accessible at all'
send_notification(msg)
restart_server_and_container()
# monitor_application()
schedule.every(5).minutes.do(monitor_application)
while True:
schedule.run_pending()
- Can send a message if the application was down and we fixed the issue.