Python
AWS¶
Assume a role¶
It's a bit painful to assume a role in Python. Use this function to simplify things
import boto3
def boto3_assume(**KW):
assumed_role_object=boto3.client('sts').assume_role(
RoleArn=f"arn:aws:iam::{KW['account']}:role/{KW['role']}",
RoleSessionName=KW.get('session',KW['role'])
)
return boto3.Session(
aws_access_key_id = assumed_role_object['Credentials']['AccessKeyId'],
aws_secret_access_key = assumed_role_object['Credentials']['SecretAccessKey'],
aws_session_token = assumed_role_object['Credentials']['SessionToken'],
)
if __name__=='__main__':
who_am_i = boto3_assume(role='MyRole',account='000000000000).sts.get_caller_identity()
Organisations¶
Find my master account¶
import boto3
def find_master_account():
return boto3.client('organizations').describe_organization().get('Organization',{}).get('MasterAccountId')
Athena¶
Run an SQL query¶
WORK IN PROGRESS
import boto3
import time
# == function to convert the result from athena to a dictionary
def process_athena(response):
_key_row = response["ResultSet"]["Rows"].pop(0)
_keys = []
for key in _key_row["Data"]:
_keys += key.values()
output = []
while response["ResultSet"]["Rows"]:
result = {_k: None for _k in _keys}
row = response["ResultSet"]["Rows"].pop(0)
_values = []
for key in row["Data"]:
_values += key.values()
for idx, _value in enumerate(_values):
result[_keys[idx]] = _value
output.append(result)
return output
def query_athena(QueryString):
Database = 'default'
WorkGroup = 'default'
athena = boto3.client('athena')
# == make the SQL query
query_id = athena.start_query_execution(
QueryString = QueryString,
#ExecutionParameters = ExecutionParameters,
QueryExecutionContext = { 'Database' : Database },
WorkGroup = WorkGroup
)['QueryExecutionId']
# == wait for it to complete
while True:
finish_state = athena.get_query_execution(QueryExecutionId=query_id)["QueryExecution"]["Status"]["State"]
if finish_state == "RUNNING" or finish_state == "QUEUED":
time.sleep(1)
else:
break
# == retrieve the results
# TODO - if we get next_token, do the call again
resultx = athena.get_query_results(QueryExecutionId=query_id, MaxResults = 1000)
# == save the results in a dictionary
result = process_athena(resultx)
return result
def main():
result = query_athena('''
SELECT
name,
surname,
phoneno
FROM
employee_data''')
print(result)
S3¶
Read all objects in a bucket¶
import boto3
Bucket = "MYBUCKETNAME"
Prefix = "myPrefix"
for P in boto3.client('s3').get_paginator('list_objects').paginate(Bucket=Bucket,Prefix = Prefix):
for k in P['Contents']:
print(k['Key'])
content = boto3.client('s3').get_object(Bucket=Bucket, Key=k['Key'])['Body'].read().decode('utf-8')
Write to an S3 bucket¶
boto3.resource('s3').Bucket(os.environ['S3BUCKET']).put_object(
ACL = 'bucket-owner-full-control',
ContentType = 'application/json',
Key = key,
Body = json.dumps(newstate,indent=2,default=str)
)
Read an S3 object¶
state = json.loads(boto3.client('s3').get_object(Bucket=os.environ['S3BUCKET'], Key=key)['Body'].read().decode('utf-8'))
SNS¶
Publish a topic¶
boto3.client('sns',region_name = 'ap-southeast-2').publish(TopicArn=os.environ['MonitorTopicSNS'],Message = msg, Subject = subject)
Date and time¶
Convert epoch time to date time
import datetime
x = datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d %H:%M:%S')
Convert string to a Python datetime
import datetime
def string_to_datestamp(t):
return datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S')
Get the current datestamp in a string
import datetime
def datestamp():
return datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
Find the first day of the week
import datetime
def firstDayoftheWeek(dte):
FDW = dte - datetime.timedelta(days = dte.weekday())
return FDW
Email¶
Send email through Outlook
import win32com.client
def send_email(to,cc,subject,body):
outlook = win32com.client.Dispatch('outlook.application')
mail = outlook.CreateItem(0)
mail.To = to
mail.Subject = subject
mail.HTMLBody = body
mail.cc = cc
mail.Display()
#mail.Send()
Files¶
Find a file age in seconds¶
import datetime
import os
filename = '/tmp/myfile.txt'
age = (datetime.datetime.today() - datetime.datetime.fromtimestamp(os.path.getmtime(filename))).seconds
print(f"age in seconds is {age}"
Find all files in a directory¶
import os
import fnmatch
def findFiles(path,filter = '*'):
q = []
for r, d, f in os.walk(path):
for file in f:
if fnmatch.fnmatch(file,filter):
q.append(os.path.join(r, file).replace('\\','/')[len(path)+1:])
return q
Find the path of my script¶
import os.path
file_path = os.path.dirname(os.path.realpath(__file__))
Write a file to disk or S3¶
import json
import boto3
import botocore
def writeFile(filename,data):
# -- what type of data is it?
if(type(data) != str):
Body = json.dumps(data,default=str)
ContentType = 'application/json'
else:
Body = data
ContentType = 'application/text'
# - is it local, or s3?
if filename.startswith('s3://'):
bucket = filename.split('/')[2]
key = '/'.join(filename.split('/')[3:])
print(f"bucket = {bucket}")
print(f"key = {key}")
try:
boto3.resource('s3').Bucket(bucket).put_object(
ACL = 'bucket-owner-full-control',
ContentType = ContentType,
Key = key,
Body = Body
)
except botocore.exceptions.ClientError as error:
print(f"WARNING - s3.put_object - {error.response['Error']['Code']}")
else:
print(f"filename = {filename}")
with open(filename,'wt') as f:
f.write(Body)
if __name__=='__main__':
x = { 'some' : 'data'}
writeFile('myfile.txt',x)
writeFile('s3://mybucket/2023/05/02/myfile.txt',x)
CSV¶
Write a CSV file¶
import csv
def writeCSV(file,data):
os.makedirs(os.path.dirname(file),exist_ok = True)
headers = list(data[0])
with open(file, 'wt', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(headers)
for x in data:
row = []
for h in headers:
row.append(x.get(h))
csvwriter.writerow(row)
if __name__ == '__main__':
data = [
{ "header1" : "data1", "header2" : "data2" },
{ "header1" : "data3", "header2" : "data4" }
]
writeCSV('output.csv',data)
Read a CSV file¶
import csv
def readCSV(file):
output = []
with open(file, 'rt',encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
output.append(row)
return output
if __name__ == '__main__':
for x in readCSV('c:/temp/test.csv'):
print(x)
Slack or Discord¶
Sending messages via Slack or Discord
import json
import urllib.request
import urllib.parse
def slackdiscord(webHook,message):
if 'discord' in webHook:
msg = { 'content' : message }
else:
msg = { 'text' : message }
req = urllib.request.Request(
webHook,
json.dumps(msg).encode('utf-8'),
{
'Content-Type': 'application/json',
'User-Agent' : 'Mozilla/5.0 (X11; U; Linux i686) Gecko/20071127 Firefox/2.0.0.11'
}
)
resp = urllib.request.urlopen(req)
return resp.read()
Web monitoring¶
import urllib.request
def website_monitor(url):
#print(f'URL : {url}')
try:
req = urllib.request.Request(url, method='HEAD')
resp = urllib.request.urlopen(req, timeout=10)
#print(resp.getcode())
return resp.getcode() == 200
except:
return False
return False
Excel¶
Write an Excel file from a dictionary¶
import xlsxwriter
def dict2xls(L,F,fmt = {}):
print(f"Writing {F}")
workbook = xlsxwriter.Workbook(F)
COLOURS = {
'darkblue' : { 'bold': True, 'font_color' : 'white', 'bg_color' : '#44546A', 'align' : 'center' },
'green' : { 'bold': True, 'font_color' : 'white', 'bg_color' : '#00B050', 'align' : 'center' },
'red' : { 'bold': True, 'font_color' : 'white', 'bg_color' : '#C00000', 'align' : 'center' },
'gray' : { 'bold': False, 'font_color' : 'black', 'bg_color' : '#C0C0C0', 'align' : 'center' }
}
heading = workbook.add_format(COLOURS['darkblue'])
for sheet in L:
worksheet = workbook.add_worksheet(sheet)
row = 0
col = 0
for H in L[sheet][0]:
worksheet.write(row,col,H,heading)
col += 1
for a in L[sheet]:
row += 1
col = 0
for H in L[sheet][0]:
# -- is there formatting involved?
cellformat = None
if H in fmt:
F = fmt[H].get(a[H])
if F != None:
cellformat = workbook.add_format(COLOURS[F])
worksheet.write(row,col,a[H],cellformat)
col += 1
worksheet.autofilter(0,0,row,col-1)
worksheet.freeze_panes(1, 1)
workbook.close()
if __name__ == '__main__':
data = { 'Sheet1' : [
{
'Heading 1' : 'Data 1',
'Heading 2' : 'Data 2'
},
{
'Heading 1' : 'Data 3',
'Heading 2' : 'Data 4'
},
]}
dict2xls(data, 'c:/temp/myexcelfile.xlsx')
Reporting¶
crosstab¶
import json
def add(matrix,x,y,z):
# -- index X
if not 'x' in matrix:
matrix['x'] = []
if not x in matrix['x']:
matrix['x'].append(x)
# -- index Y
if not 'y' in matrix:
matrix['y'] = []
if not y in matrix['y']:
matrix['y'].append(y)
# -- index z
if not 'z' in matrix:
matrix['z'] = {}
if not x in matrix['z']:
matrix['z'][x] = {}
matrix['z'][x][y] = z
def render_markdown(matrix):
matrix['x'].sort()
matrix['y'].sort()
out = "||**" + "**|**".join(matrix['x']) + "**|\n"
out += "|--" * (len(matrix['x']) + 1) + "|\n"
for y in matrix['y']:
out += f"|**{y}**|"
for x in matrix['x']:
out += f"{matrix['z'][x].get(y,'')}|"
out += "\n"
return out
def render_html(matrix):
matrix['x'].sort()
matrix['y'].sort()
out = '<table>\n'
out += "<tr><th> </th><th>" + "</th><th>".join(matrix['x']) + "</th></tr>\n"
for y in matrix['y']:
out += f"<tr><th>{y}</th>"
for x in matrix['x']:
out += f"<td>{matrix['z'][x].get(y,'')}</td>"
out += "</tr>\n"
out += '</table>\n'
return out
def main():
x = {}
add(x,'X1','Y1','hello')
add(x,'X2','Y1','there')
add(x,'X1','Y2','Is')
add(x,'X2','Y2','this')
add(x,'X3','Y2','working')
add(x,'X4','Y3','Howdy')
# render markdown
with open('output.md','wt') as F:
F.write('# The main thing\n\n')
F.write(render_markdown(x))
# render HTML
with open('output.html','wt') as F:
F.write('<html>\n')
F.write('<head><style>body { font-family:verdana;} th { background-color: #0047AB; color: #FFFFFF } td { background-color: ##B2FFFF; } table,th,td { border: 1px solid gray; border-collapse: collapse; padding: 5px; } </style></head>')
F.write('<h1>The HTML thing</h1>\n\n')
F.write(render_html(x))
F.write('</html>')
main()
Command-line arguments¶
import argparse
parser = argparse.ArgumentParser(description='CloudFormation Helper')
# -- read a variable
parser.add_argument('-desc',help='Set a description for the CloudFormation file')
# -- Mandatory variable
parser.add_argument('-cf', help='Path to the CloudFormation json file', required=True)
# -- Provide multiple variables in a list
parser.add_argument('-add',help='Add a new resource to the CloudFormation file',nargs='+')
# -- Make it a toggle switch
parser.add_argument('-overwrite', help='Forces an overwrite of a resource if it already exists', action='store_true')
# -- consume the data
args = parser.parse_args()
print(args.desc)