diff --git a/README.md b/README.md index 56453fdb7bd47635523b173d8b3e6e56fcb5e3fe..ca2eb312dfdbdf23690e7200dcb0ae2426d36702 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ -# Изучение AWS -Ð’Ñе материалы Ð´Ð»Ñ Ð¸Ð·ÑƒÑ‡ÐµÐ½Ð¸Ñ Ð¸Ñкать в вики проекта. +# study + +Useful materials for the "Study AWS" group \ No newline at end of file diff --git a/p.kirilin/Makefile b/p.kirilin/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..2db08555d9fd4819c69d4db1a94edc24b5919aaf --- /dev/null +++ b/p.kirilin/Makefile @@ -0,0 +1,43 @@ +TASK1_PATH=./task.1/ +TASK2_PATH=./task.2/ +S3_BUCKET_NAME=s3rius-weather-bucket + +# Build infrastructure for task1. +task1: + cd "${TASK1_PATH}"; \ + sh build.sh "${S3_BUCKET_NAME}" + +# Destroy all infrastructure from task.1. +task1_destroy: + cd "${TASK1_PATH}"; \ + terraform destroy + +# Build infrastructure for task2. +task2: + cd "${TASK2_PATH}"; \ + sh build.sh "${S3_BUCKET_NAME}" + +task2_import: + cd "${TASK2_PATH}"; \ + terraform import module.services.aws_s3_bucket.s3_weather_bucket "${S3_BUCKET_NAME}" + +# Deploy infrastructures from first and second tasks. +task2_full: task1 task2_import task2 + +# Destroy second task infrastructure +task2_destroy: + cd "${TASK2_PATH}"; \ + terraform destroy + +# Generate and publish messages to Kinesis Firehose. +firehose_publish: +ifdef num + cd "${TASK1_PATH}/fire_publisher"; \ + make num=${num} +else + cd "${TASK1_PATH}/fire_publisher"; \ + make +endif + +# Destroy all +destroy: task1_destroy task2_destroy diff --git a/p.kirilin/README.md b/p.kirilin/README.md index f4f111d3e4a98d082e82b26d860cc871233025a2..eea2aaf33ba91a28596121da173b7a3bf81e32d3 100644 --- a/p.kirilin/README.md +++ b/p.kirilin/README.md @@ -73,6 +73,16 @@ Триггер не указываетÑÑ Ð² Lambda </a> </li> + <li> + <a href="#не-могу-переиÑпользовать-бакет-s3"> + Ðе могу переиÑпользовать бакет S3 + </a> + </li> + <li> + <a href="#что-то-Ñ-не-понÑл-а-как-работать-Ñ-asg-и-ec2"> + Что-то Ñ Ð½Ðµ понÑл, как работать Ñ Auto scaling group и решать задачи + </a> + </li> </ul> </li> <li> @@ -150,7 +160,7 @@ pub async fn process(e: CustomEvent) -> Result<CustomOutput, LambdaError> { Ðто потребуетÑÑ Ð´Ð»Ñ Ð¸ÑÐ¿Ð¾Ð»ÑŒÐ·Ð¾Ð²Ð°Ð½Ð¸Ñ Ñторонних ÑервиÑов Ðмазона в лÑмбдах. ## Rust as Lambda -Ðемного о раÑте как Ñ€ Ñреде Ð´Ð»Ñ Ð·Ð°Ð¿ÑƒÑка лÑмбд в AWS. +Ðемного о раÑте как о Ñреде Ð´Ð»Ñ Ð·Ð°Ð¿ÑƒÑка лÑмбд в AWS. РаÑÑ‚ ÑвлÑетÑÑ ÐºÐ¾Ð¼Ð¿Ð¸Ð»Ð¸Ñ€ÑƒÐµÐ¼Ñ‹Ð¼ Ñзыком и реального рантайма под него в AWS нет, и именно поÑтому нужно иÑпользовать рантайм [Provided](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-custom.html); @@ -225,7 +235,7 @@ path = "src/main.rs" Ð”Ð»Ñ Ð¿ÐµÑ€Ð²Ð¾Ð³Ð¾ Ð·Ð°Ð´Ð°Ð½Ð¸Ñ Ð½ÑƒÐ¶Ð½Ñ‹ права на запуÑк лÑмбды, выгрузку логов и возможноÑть запиÑи в s3. Вот конфигурациÑ: -```hcl-terraform +```terraform data "aws_iam_policy_document" "fire_extra_policy_doc" { # allow to write logs statement { @@ -259,7 +269,6 @@ data "aws_iam_policy_document" "fire_extra_policy_doc" { statement { effect = "Allow" actions = [ - "sts:AssumeRole", "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ] @@ -272,7 +281,7 @@ data "aws_iam_policy_document" "fire_extra_policy_doc" { Потом Ñледует приатачить права к вашей ÑущеÑтвующей роли firehose. -```hcl-terraform +```terraform resource "aws_iam_policy" "firehose_extra_policy" { name = "firehose-extra-permissions-policy" policy = data.aws_iam_policy_document.fire_extra_policy_doc.json @@ -307,12 +316,11 @@ resource "aws_iam_role_policy_attachment" "firehose_iam_role_policy_attachment" ### Где хранить policy и roles? Так как terraform ÑвлÑетÑÑ Ð´Ð¾Ð²Ð¾Ð»ÑŒÐ½Ð¾ Ñвободной, в плане конфигурации, штукой то роли, да и вообще любые куÑки инфраÑтруктуры, можно хранить как угодно. Я пришел к тому, что хранить в одном файле роли, их полиÑи_доки и Ñами объекты инфраÑтруктуры, где Ñти полиÑи и -роли применÑÑŽÑ‚ÑÑ ÐºÑƒÐ´Ð° удобнее, чем отдельный модуль Ñ Ð¿Ð¾Ð»Ð¸Ñи и отдельный Ñ Ñ€Ð¾Ð»Ñми. +роли применÑÑŽÑ‚ÑÑ ÐºÑƒÐ´Ð° удобнее, чем отдельный модуль Ñ Ð¿Ð¾Ð»Ð¸Ñи и отдельный Ñ Ñ€Ð¾Ð»Ñми. За Ñто озарение ÑпаÑибо [Peter Zinin](skype:live:8422?chat). -С одной Ñтороны Ð¿Ð¾Ð´Ð¾Ð±Ð½Ð°Ñ Ñвобода в напиÑании проекта Ñто удобно. Ðо когда проект разраÑтаетÑÑ, то без должной Ñтруктуры проект превразатÑÑ Ð² кашу. +С одной Ñтороны Ð¿Ð¾Ð´Ð¾Ð±Ð½Ð°Ñ Ñвобода в напиÑании проекта Ñто удобно. Ðо когда проект разраÑтаетÑÑ, то без должной Ñтруктуры проект превращаетÑÑ Ð² кашу. -Чтобы Ñтого избежать лучше вÑего логичеÑки разделÑть отдельные чаÑти инфраÑтруктуры и пиÑать в отдельных файлах вÑÑ‘ необходимое именно -Ð´Ð»Ñ Ñтого Ñлемента, таким образом, чтобы они не завиÑили друг от друга. +Чтобы Ñтого избежать, лучше вÑего логичеÑки разделÑть отдельные чаÑти инфраÑтруктуры и пиÑать в отдельных файлах вÑÑ‘ необходимое Ð´Ð»Ñ ÐºÐ¾Ð½ÐºÑ€ÐµÑ‚Ð½Ð¾Ð³Ð¾ Ñлемента, таким образом, чтобы минимизировать линки между файлами. Как по мне, так пиÑать в одном файле вообще не Ð»ÑƒÑ‡ÑˆÐ°Ñ Ð·Ð°Ñ‚ÐµÑ, так что, даже еÑли проект небольшой, то лучше вÑÑ‘ равно разделÑть его на какие-то чаÑти. @@ -322,7 +330,7 @@ resource "aws_iam_role_policy_attachment" "firehose_iam_role_policy_attachment" Хоть и показано, что `SQS` привÑзан к топику `SNS` требуетÑÑ Ð²Ñ‹Ð´Ð°Ñ‡Ð° определенных прав. Добавьте в конфигурацию террафаорм Ñледующее: -```hcl-terraform +```terraform # Добавление прав публикации к очереди. resource "aws_sqs_queue_policy" "sqs_publish_policy" { @@ -363,7 +371,7 @@ data "aws_iam_policy_document" "sqs_send_message_policy_doc" { Попробуйте поменÑть `aws_iam_policy_document`, Ñодержащий `actions: [lambda:InvokeFunction]` на Ñледующий куÑок кода: -```hcl-terraform +```terraform # ПодÑтавьте Ñюда Ñвои Ð½Ð°Ð·Ð²Ð°Ð½Ð¸Ñ Ð»Ñмбды и топика. resource "aws_lambda_permission" "sns_allow_lambda_permission" { @@ -379,6 +387,171 @@ resource "aws_lambda_permission" "sns_allow_lambda_permission" { <img src="./images/lambda_sns_trigger.png"> </div> +## Ðе могу переиÑпользовать бакет S3 + +Ð”Ð»Ñ Ð¿ÐµÑ€ÐµÐ¸ÑÐ¿Ð¾Ð»ÑŒÐ·Ð¾Ð²Ð°Ð½Ð¸Ñ ÑƒÐ¶Ðµ ÑущеÑтвующего бакета s3 или любого другого объекта инфраÑтруктуры, перед +`terraform apply` Ñледует вручную обновить информацию о его ÑоÑтоÑнии в `terraform.tfstate`. +Ð”Ð»Ñ Ñтого выполните команду в терминале: +```bash +# Путь до объекта в terraform конфигах Уникальный идентификатор в aws. +terraform import module.services.aws_s3_bucket.bucket "bucket-name" +``` +Как импортировать другие объекты можно прочитать в доках терраформ в разделе Import. + +## Что-то Ñ Ð½Ðµ понÑл, а как работать Ñ ASG и EC2 +Auto scaling group штука довольно Ð½ÐµÐ¾Ð±Ñ‹Ñ‡Ð½Ð°Ñ Ð¸ очень удобнаÑ, Ñ Ð¾Ð´Ð½Ð¾Ð¹ Ñтороны. C другой Ñтороны, имеет + некоторые моменты, которые Ñтоит проÑÑнить прежде чем работать Ñ Ð½Ð¸Ð¼. + +Давайте примерно разберем архитектуру ASG в данном проекте. +Ðа картинке ниже предÑтавлена архитектура ASG Ñо второго Ð·Ð°Ð´Ð°Ð½Ð¸Ñ Ñ Ð½ÐµÐºÐ¾Ñ‚Ð¾Ñ€Ñ‹Ð¼Ð¸ допущениÑми, которые мы разберем позднее. +<div align="center"> + <img src="./images/asg-architecture.png"> +</div> + + ### Scale up and Scale down policies + + Ðачнем Ñ Ñ‚Ð¾Ð³Ð¾, что `Auto Scaling Group` не должен Ñам выбирать, когда добавлÑть новый `EC2 instance` и когда его оÑтанавливать, он проÑто умеен Ñто делать. Ð”Ð»Ñ Ð¾Ñ‚ÑÐ»ÐµÐ¶Ð¸Ð²Ð°Ð½Ð¸Ñ ÑоÑтоÑний нам потребуетÑÑ `CloudWatch` и его `Alarms`. Работают они крайне тупо, но Ñффективно. Ðа диаграмме Ñверху Ñ Ð¿Ð¾Ð¼ÐµÑ‚Ð¸Ð» ÑвÑзь между алармой и SQS как + `Observing metrics` потому что именно он будет нашим тригером. Ð’ данном Ñлучае `CloudWatch` через некоторые промежутки времени запращивает у `SQS` какие-нибудь метрики и передает их в Ñвои алармы. Ð˜Ð´ÐµÑ Ð² том, что у `Alarm` еÑть некоторый триггер и дейÑтвие, которое произойдет, когда уÑловие тригера Ñработает. + + Вот пример конфигурации Alarm отноÑительно количеÑтва Ñообщений в SQS: + ```terraform + resource "aws_cloudwatch_metric_alarm" "sqs-asg-metric-alarm" { + alarm_name = "SQS-message-alarm" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = "1" + metric_name = "ApproximateNumberOfMessagesVisible" + namespace = "AWS/SQS" + period = "60" + statistic = "SampleCount" + threshold = "5" + + dimensions = { + QueueName = aws_sqs_queue.s3rius_sqs_queue.name + } + + alarm_description = "This metric monitors SQS messages and fire ec2 from asg" + alarm_actions = [ + aws_autoscaling_policy.scale_up_policy.arn + ] + } + ``` + + ЕÑли одновременно в очереди виÑит больше 5 Ñообщений то вызываетÑÑ `aws_autoscaling_policy.scale_up_policy`, где опиÑано Ð´Ð¾Ð±Ð°Ð²Ð»ÐµÐ½Ð¸Ñ Ð¾Ð´Ð½Ð¾Ð³Ð¾ инÑтанÑа. + Также в проекте еÑть второй `aws_autoscaling_policy`, который дропает `EC2`, когда инÑÑ‚Ð°Ð½Ñ 4 минуты подрÑд + проÑтаивает Ñ Ð·Ð°Ð³Ñ€ÑƒÐ·ÐºÐ¾Ð¹ CPU ниже 5%. + <div align="center"> + <img src="./images/asg-meme.png"> + </div> + Замечу, что `action` у аларма - Ñто проÑто arn какого-нибудь aws_autoscaling_policy. Ð’ данном Ñлучае полиÑи проÑто триггернетÑÑ Ð¸ Ñам решит что делать. Вообще в alarm можно вÑтраивать не только тупые Ñтандартные проверочки, но и целые вычиÑлениÑ. + + ### Я поднимаю EC2, а как обрабатывать ÑÐ¾Ð¾Ð±Ñ‰ÐµÐ½Ð¸Ñ SQS? + Очень проÑто. Сам EC2 - Ñто проÑто мелкий Ñервер, который работает как твой локальный комп. У него еÑть user-commands, которые выполнÑÑŽÑ‚ÑÑ Ð¿Ñ€Ð¸ запуÑке. ПроÑто прокинь туда запуÑк Ñвоего Ð¿Ñ€Ð¸Ð»Ð¾Ð¶ÐµÐ½Ð¸Ñ Ð½Ð° любом Ñзыке, которое будет Ñлушать SQS и обрабатывать ÑÐ¾Ð¾Ð±Ñ‰ÐµÐ½Ð¸Ñ Ð² вечном лупе. ЕÑли что, `AutoSclaingGroup` Ñам дропнет инÑтанÑ, который Ñлишком долго работает ничего не обрабатываÑ. + + Бтв, еÑли не наÑтроил `AutoScalingPolicies`, риÑкуешь влететь на монетки. Ðе очень большие, но вÑÑ‘ равно не очень приÑтно. Также замечу, что беÑплатный тип инÑтанÑа `EC2` - Ñто `t2.micro`, а **ÐЕ** `t2.nano`. + + ### Я понÑл, как пиÑать user-commands, а как загрузить приложение на EC2? + Скомпиль локально или проÑто заверни в zip-файл и через `terraform` загрузи в какой-нибудь `s3-bucket`. + Вот пример такой загрузки: + ```terraform + // bucket - ведро, куда подгрузить файл приложениÑ; + // key - путь в s3 до файла; + // source - путь до локального файла. + resource "aws_s3_bucket_object" "application_data" { + bucket = aws_s3_bucket.asg_meta_bucket.bucket + key = var.ec2_application_s3_key + source = var.ec2_application_bin + } + ``` + ПоÑле Ñ€Ð°Ð·Ð²ÐµÑ€Ñ‚Ñ‹Ð²Ð°Ð½Ð¸Ñ Ð¸Ð½Ñ„Ñ€Ð°Ñтруктуры у Ñ‚ÐµÐ±Ñ Ð±ÑƒÐ´ÐµÑ‚ объект в s3-bucket Ñ Ñамого начала. Ðикакой ручной загрузки. + Рв Ñамом инÑтанÑе внутри user-command Ñкачай файл через aws-cli и разархивируй его. + ```bash + aws s3 cp "s3://${meta_bucket}/${application_key}" "application.zip" + unzip application.zip + ``` + Рвот дальше запуÑкай как знаешь. + + ### Я разархиаировал приложение, но Ð´Ð»Ñ Ð·Ð°Ð¿ÑƒÑка мне нужны Ñторонние тулы/Ñзыки, которых нет + EC2 - Ð¾Ð±Ñ‹Ñ‡Ð½Ð°Ñ `unix`'Ð¾Ð²Ð°Ñ Ð¼Ð°ÑˆÐ¸Ð½ÐºÐ°. Ру Ñтих ребÑÑ‚ еÑть пакетные менеджеры и прочие штуки, чтобы + удобно что-нибудь уÑтановить. У AmazonLinux, который ты, Ñкорее вÑего выбрал, Ñтоит yum (Ðто как у fedora, еÑли шаришь, еÑли нет, то зырк [Ñюды](https://habr.com/ru/post/301292/)). + + Рвот Ð´Ð»Ñ Ñ€Ð°Ð±Ð¾Ñ‚Ñ‹ `YUM` нужен выход в тырнетики, чтобы он мог вÑÑ‘ Ñкачать. Ð”Ð»Ñ Ñтого решений парочка. + Ðапример: + * Развернуть `VPC`; + * ÐаÑтроить `aws_internet_gateway` и подключить его к `VPC`. + * Создать `aws_subnet`, Ñ `cidr_block = "10.1.1.0/24"` + * Подрубить баланÑировщик и вÑе инÑтанÑÑ‹ к Ñтой Ñетке. + + Ðо Ñто охрененно Ñложно, как Ñ Ð´ÑƒÐ¼Ð°ÑŽ. ПоÑтому предлагаю вариант, который + почему-то не оÑвещаетÑÑ Ð½Ð° первых Ñтраницах гугла. (ЧертовÑки подозрительно, кÑтати); + + Создадим штуку, ÐºÐ¾Ñ‚Ð¾Ñ€Ð°Ñ Ð±ÑƒÐ´ÐµÑ‚ пуÑкать траффик куда угодно по любому порту. + в `aws_security_group` `egress` - Ñто иÑходÑщий траффик, `ingress` - входÑщий. + ```terraform + resource "aws_security_group" "outbound_traffic_security" { + name = "outbound-ec2" + + # Allow all outbound + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = [ + "0.0.0.0/0" + ] + } + + # Inbound HTTP from anywhere + ingress { + from_port = var.ec2_port + to_port = var.ec2_port + protocol = "tcp" + cidr_blocks = [ + "0.0.0.0/0" + ] + } +} +``` + +Рпотом в Ñвоем `aws_launch_configuration` добавь: +```terraform + security_groups = [ + aws_security_group.inbound_traffic_security.id, + aws_security_group.outbound_traffic_security.id + ] +``` + +Теперь можно Ñмело Ñкачивать хоть веÑÑŒ интернет. Ðо еÑли хочетÑÑ Ñ‡ÐµÐ³Ð¾-то конкретного поÑтавить. ИÑпользуй пакетный менеджер или `wget`. Ðапример так: +```bash +yum update -y +yum install <package> +``` +Чтоб узнать еÑть ли какой-нибудь пакет в `AmazonLinux` репозиториÑÑ… поÑмотри [полный ÑпиÑок доÑтупных пакетов](https://aws.amazon.com/ru/amazon-linux-ami/2018-03-packages/). + +### Как отправлÑть логи ec2 в CloudWatch? + <div align="center"> + <img src="./images/ec2-logging-meme.jpg"> + </div> + + Логгирование в stdout на ec2 не вариант, еÑли не знать как. Так-то вÑе логи пишутÑÑ Ð² systemlog, который можно почитать поÑле Ð²Ñ‹ÐºÐ»ÑŽÑ‡ÐµÐ½Ð¸Ñ Ð¸Ð½ÑтанÑа. Ðо Ñто неудобно. ПоÑтому предлагаю наÑтроить user-command так, чтобы веÑÑŒ лог из stdout запиÑывалÑÑ Ð² каÑтомный файл. + ```bash + #!/bin/bash + LOG_FILE="/var/log/my_logs.log" + + function main() { + # Your awesome logic + } + +exec > >(tee "$LOG_FILE" | logger -t user-data -s 2>/dev/console) 2>&1 + echo "################# INSTANCE STARTUP #################" + main + ``` + Потом Ñпокойно подключаемÑÑ Ð¿Ð¾ ssh и в реальном времени читаем логи Ñ Ð¿Ð¾Ð¼Ð¾Ñ‰ÑŒÑŽ: + ```bash + tail -f "/var/log/my_logs.log" + ``` + Также еÑть вариант подключить CloudWatch агента. С Ñтим Ñ Ð¾Ñобо не разбиралÑÑ, но еÑли интереÑно, то + [Ñмотреть Ñюда](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/install-CloudWatch-Agent-commandline-fleet.html). + # Три закона робототехники Ðйзека Ðзимова * Робот не может причинить вред человеку или Ñвоим бездейÑтвием допуÑтить, чтобы человеку был причинён вред. * Робот должен повиноватьÑÑ Ð²Ñем приказам, которые даёт человек, кроме тех Ñлучаев, когда Ñти приказы противоречат Первому Закону. diff --git a/p.kirilin/images/asg-architecture.png b/p.kirilin/images/asg-architecture.png new file mode 100644 index 0000000000000000000000000000000000000000..740a184e699d08f189c84de92956a484d2f91c65 Binary files /dev/null and b/p.kirilin/images/asg-architecture.png differ diff --git a/p.kirilin/images/asg-meme.png b/p.kirilin/images/asg-meme.png new file mode 100644 index 0000000000000000000000000000000000000000..85a717b08d3f5f3d3c6d5f82dbd2f6c069e07e04 Binary files /dev/null and b/p.kirilin/images/asg-meme.png differ diff --git a/p.kirilin/images/ec2-logging-meme.jpg b/p.kirilin/images/ec2-logging-meme.jpg new file mode 100644 index 0000000000000000000000000000000000000000..e45f3429c7c04ef4f3209496edc19832acc7e384 Binary files /dev/null and b/p.kirilin/images/ec2-logging-meme.jpg differ diff --git a/p.kirilin/lambda_models/.gitignore b/p.kirilin/lambda_models/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..e0cf6a8c849a3bdc5eba3cd69b129b6b4dcec652 --- /dev/null +++ b/p.kirilin/lambda_models/.gitignore @@ -0,0 +1,3 @@ +.idea/ +target/ +Cargo.lock \ No newline at end of file diff --git a/p.kirilin/task.1/lambdas/lambda_models/Cargo.toml b/p.kirilin/lambda_models/Cargo.toml similarity index 61% rename from p.kirilin/task.1/lambdas/lambda_models/Cargo.toml rename to p.kirilin/lambda_models/Cargo.toml index 9868fe1114ead4af44e79e09d741b72c44ef86bd..f080b713a61fe7056d100d141355af340eed9c0a 100644 --- a/p.kirilin/task.1/lambdas/lambda_models/Cargo.toml +++ b/p.kirilin/lambda_models/Cargo.toml @@ -6,12 +6,30 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +parsing_parquet = ["parquet", "tokio"] + + [dependencies] log = "0.4.8" serde_json = "1.0" rand = "0.7" + [dependencies.serde] version = "1.0" features = [ "derive" ] + +[dependencies.parquet] +version = "0.15.1" +optional = true + +[dependencies.tokio] +version = "0.2" +default_features = false +optional = true +features = [ + "fs" +] + diff --git a/p.kirilin/task.1/lambdas/lambda_models/src/data_examples/weather_data_input.json b/p.kirilin/lambda_models/src/data_examples/weather_data_input.json similarity index 100% rename from p.kirilin/task.1/lambdas/lambda_models/src/data_examples/weather_data_input.json rename to p.kirilin/lambda_models/src/data_examples/weather_data_input.json diff --git a/p.kirilin/task.1/lambdas/lambda_models/src/data_examples/weather_data_output.json b/p.kirilin/lambda_models/src/data_examples/weather_data_output.json similarity index 100% rename from p.kirilin/task.1/lambdas/lambda_models/src/data_examples/weather_data_output.json rename to p.kirilin/lambda_models/src/data_examples/weather_data_output.json diff --git a/p.kirilin/task.1/lambdas/lambda_models/src/lib.rs b/p.kirilin/lambda_models/src/lib.rs similarity index 100% rename from p.kirilin/task.1/lambdas/lambda_models/src/lib.rs rename to p.kirilin/lambda_models/src/lib.rs diff --git a/p.kirilin/task.1/lambdas/lambda_models/src/locator.rs b/p.kirilin/lambda_models/src/locator.rs similarity index 100% rename from p.kirilin/task.1/lambdas/lambda_models/src/locator.rs rename to p.kirilin/lambda_models/src/locator.rs diff --git a/p.kirilin/task.1/lambdas/lambda_models/src/weather_data_input.rs b/p.kirilin/lambda_models/src/weather_data_input.rs similarity index 100% rename from p.kirilin/task.1/lambdas/lambda_models/src/weather_data_input.rs rename to p.kirilin/lambda_models/src/weather_data_input.rs diff --git a/p.kirilin/task.1/lambdas/lambda_models/src/weather_data_output.rs b/p.kirilin/lambda_models/src/weather_data_output.rs similarity index 54% rename from p.kirilin/task.1/lambdas/lambda_models/src/weather_data_output.rs rename to p.kirilin/lambda_models/src/weather_data_output.rs index b425bd7882f9349b6dba2f971f12bcec3ba7038d..9412a6b822769a5d8f4722b0f99e522d55c9b287 100644 --- a/p.kirilin/task.1/lambdas/lambda_models/src/weather_data_output.rs +++ b/p.kirilin/lambda_models/src/weather_data_output.rs @@ -32,6 +32,41 @@ impl From<WeatherDataInput> for WeatherDataOutput { } } +#[cfg(feature = "parsing_parquet")] +pub mod parse_parquet { + use super::WeatherDataOutput; + use tokio::fs::File; + use std::path::Path; + use parquet::file::reader::{SerializedFileReader, FileReader}; + use parquet::record::RowAccessor; + use std::error::Error; + + impl WeatherDataOutput { + pub async fn from_parquet_file(file_name: String) -> Result<Vec<Self>, Box<dyn Error + Send + Sync + 'static>> { + let parquet_file = File::open(&Path::new(file_name.as_str())) + .await? + .into_std() + .await; + let reader = SerializedFileReader::new(parquet_file).unwrap(); + let iter = reader.get_row_iter(None)?; + let mut results = Vec::new(); + for record in iter { + results.push(Self { + timestamp: record.get_long(0)? as usize, + coords: record.get_long(1)? as usize, + temperature: record.get_long(2)?, + humidity: record.get_long(3)?, + pressure: record.get_long(4)?, + city: record.get_string(5)?.clone(), + country: record.get_string(6)?.clone(), + ip_addr: record.get_string(7)?.clone(), + }); + } + Ok(results) + } + } +} + #[must_use] pub fn get_firehose_output_example() -> String { include_str!("data_examples/weather_data_output.json").to_string() @@ -41,7 +76,7 @@ pub fn get_firehose_output_example() -> String { mod tests { use crate::{StationTypes, WeatherDataInput}; use super::WeatherDataOutput; - + #[test] pub fn test_mapping_inputs() { let input = WeatherDataInput { diff --git a/p.kirilin/task.1/aws_services/fire_lambda.tf b/p.kirilin/task.1/aws_services/fire_lambda.tf index d9b4265b90e374dbe7043a61f9111312e6cfaa11..ae6689fbf865f539bb5a114fcce45a1d7c296453 100644 --- a/p.kirilin/task.1/aws_services/fire_lambda.tf +++ b/p.kirilin/task.1/aws_services/fire_lambda.tf @@ -1,11 +1,13 @@ # Firehose lambda declaration resource "aws_lambda_function" "firehorse_lambda" { - filename = var.firehose_lambda_file - function_name = var.firehose_lambda_name - handler = "Provided" - role = aws_iam_role.fire_lambda_role.arn - runtime = "provided" - timeout = 3 + source_code_hash = filebase64sha256(var.firehose_lambda_file) + role = aws_iam_role.fire_lambda_role.arn + function_name = var.firehose_lambda_name + filename = var.firehose_lambda_file + memory_size = var.lambda_memory_size + timeout = var.lambda_timeouts + handler = "Provided" + runtime = "provided" environment { variables = { @@ -17,7 +19,6 @@ resource "aws_lambda_function" "firehorse_lambda" { } resource "aws_iam_role" "fire_lambda_role" { - name = var.firehose_lambda_name assume_role_policy = data.aws_iam_policy_document.fire_lambda_role_policy_doc.json } diff --git a/p.kirilin/task.1/aws_services/firehose.tf b/p.kirilin/task.1/aws_services/firehose.tf index d62d074f0987ec2537db7c197016b9206be83aa5..a6fcc7785de49ed96374b0b3bbe29df5c13e6346 100644 --- a/p.kirilin/task.1/aws_services/firehose.tf +++ b/p.kirilin/task.1/aws_services/firehose.tf @@ -88,6 +88,7 @@ data "aws_iam_policy_document" "firehose_role_policy_doc" { } data "aws_iam_policy_document" "fire_extra_policy_doc" { + version = "2012-10-17" # allow to write logs statement { effect = "Allow" @@ -120,7 +121,6 @@ data "aws_iam_policy_document" "fire_extra_policy_doc" { statement { effect = "Allow" actions = [ - "sts:AssumeRole", "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ] diff --git a/p.kirilin/task.1/aws_services/s3.tf b/p.kirilin/task.1/aws_services/s3.tf index b1ce4c01862810658ff7698f24b468676769d0ba..93f32eeb1a85176c06ba09af2f28bcc74ebe1dd9 100644 --- a/p.kirilin/task.1/aws_services/s3.tf +++ b/p.kirilin/task.1/aws_services/s3.tf @@ -1,4 +1,5 @@ resource "aws_s3_bucket" "bucket" { - bucket = "s3rius-weather-bucket" - acl = "private" + bucket = var.s3_bucket_name + acl = "private" + force_destroy = true } \ No newline at end of file diff --git a/p.kirilin/task.1/aws_services/sns_lambda.tf b/p.kirilin/task.1/aws_services/sns_lambda.tf index a84b35eb90aba9aebdf8dc60794b588505082ef2..d5169f6c820c7ab7866fdb3dbd94552cb99863e9 100644 --- a/p.kirilin/task.1/aws_services/sns_lambda.tf +++ b/p.kirilin/task.1/aws_services/sns_lambda.tf @@ -1,10 +1,12 @@ resource "aws_lambda_function" "sns_lambda" { - filename = var.sns_lambda_file - function_name = var.sns_lambda_name - handler = "Provided" - role = aws_iam_role.sns_lambda_role.arn - runtime = "provided" - timeout = 3 + source_code_hash = filebase64sha256(var.firehose_lambda_file) + role = aws_iam_role.sns_lambda_role.arn + memory_size = var.lambda_memory_size + function_name = var.sns_lambda_name + filename = var.sns_lambda_file + timeout = var.lambda_timeouts + handler = "Provided" + runtime = "provided" environment { variables = { diff --git a/p.kirilin/task.1/aws_services/variables.tf b/p.kirilin/task.1/aws_services/variables.tf index 4f5593a8a4262560198ee892fce504ceca4ad78a..69282b96ad2b82a82582fa9f4df440def2d1c561 100644 --- a/p.kirilin/task.1/aws_services/variables.tf +++ b/p.kirilin/task.1/aws_services/variables.tf @@ -1,3 +1,7 @@ +variable "s3_bucket_name" { + type = string +} + variable "firehose_role_name" { type = string } @@ -26,3 +30,13 @@ variable "sns_lambda_file" { variable "glue_role_name" { type = string } + +variable "lambda_timeouts" { + type = number + default = 10 +} + +variable "lambda_memory_size" { + type = number + default = 128 +} \ No newline at end of file diff --git a/p.kirilin/task.1/build.sh b/p.kirilin/task.1/build.sh index a1b52e6c5995dc46c20d2dffa451ffdf73a4169c..83d530f11376de78019f71f3c73a3f5b2a0321c4 100755 --- a/p.kirilin/task.1/build.sh +++ b/p.kirilin/task.1/build.sh @@ -49,6 +49,11 @@ function main(){ terraform apply } +# Export s3 bucket name to be same as in second task. +if [[ -n "$1" ]]; then + export TF_VAR_s3_bucket_name=$1 +fi + # Remove packaged lambdas on exit trap rm_lambdas EXIT TERM main diff --git a/p.kirilin/task.1/fire_publisher/Cargo.toml b/p.kirilin/task.1/fire_publisher/Cargo.toml index dfc63efbe7eeb285103b633d7938133c72d7beed..e6218cb93c86efa50cd95956cc2223b4eb6b59de 100644 --- a/p.kirilin/task.1/fire_publisher/Cargo.toml +++ b/p.kirilin/task.1/fire_publisher/Cargo.toml @@ -17,7 +17,7 @@ serde_json = "1.0" env_logger = "0.7.1" rusoto_firehose = "0.43" rusoto_signature = "0.43.0" -lambda_models = {path = "../lambdas/lambda_models"} +lambda_models = {path = "../../lambda_models"} [dependencies.tokio] version = "0.2.4" diff --git a/p.kirilin/task.1/fire_publisher/Makefile b/p.kirilin/task.1/fire_publisher/Makefile index 7ebe6248f2c71d3e7453bbb8da2b208839eb9749..3b217899d5e2c430929ab5e80f728f81e9c9415f 100644 --- a/p.kirilin/task.1/fire_publisher/Makefile +++ b/p.kirilin/task.1/fire_publisher/Makefile @@ -1,4 +1,8 @@ all: run run: +ifdef num + cargo run -- ${num} +else cargo run +endif \ No newline at end of file diff --git a/p.kirilin/task.1/fire_publisher/src/main.rs b/p.kirilin/task.1/fire_publisher/src/main.rs index a04b90a203cacdc35a88a65073e6f5783f6410bf..da224ee19b312bc4d859b5248ed580ad8bd19198 100644 --- a/p.kirilin/task.1/fire_publisher/src/main.rs +++ b/p.kirilin/task.1/fire_publisher/src/main.rs @@ -10,14 +10,20 @@ use rusoto_firehose::{ use rusoto_signature::Region; use std::error::Error; use std::time::Duration; +use std::str::FromStr; #[tokio::main] pub async fn main() { dotenv::dotenv().ok(); + let mut messages_quantity = 10; + if let Some(quantity) = std::env::args().nth(1) { + println!("Desired messages quantity {}", quantity); + messages_quantity = i32::from_str(quantity.as_str()).expect("Not a number"); + } env_logger::init_from_env(Env::default().default_filter_or("debug")); let client = rusoto_firehose::KinesisFirehoseClient::new(Region::EuCentral1); let delivery_stream_name = String::from("kinesis-s3rius-fire-stream"); - for _ in 0..10 { + for _ in 0..messages_quantity { let message = publish_message(&client, delivery_stream_name.clone()).await; match message { Ok(out) => { @@ -26,7 +32,7 @@ pub async fn main() { debug!("Encrypted: {:#?}", out.encrypted); } Err(err) => { - error!("Message wasn't send"); + error!("Message wasn't sent"); error!("{:#?}", &err.to_string()) } } diff --git a/p.kirilin/task.1/instance.tf b/p.kirilin/task.1/instance.tf index bd96adeefd92a758a3ddb556246152e3ac580834..5a12b50e85244eb79205eb651d2cd966b4bd3843 100644 --- a/p.kirilin/task.1/instance.tf +++ b/p.kirilin/task.1/instance.tf @@ -6,6 +6,9 @@ provider "aws" { module "services" { source = "./aws_services" + // S3 variables + s3_bucket_name = var.s3_bucket_name + // Firehose variables firehose_lambda_name = "s3rius_fire_lambda" firehose_lambda_file = var.firehose_lambda_function diff --git a/p.kirilin/task.1/lambdas/firehose_lambda_function/Cargo.toml b/p.kirilin/task.1/lambdas/firehose_lambda_function/Cargo.toml index 58f721671f35411b5b7f1429223f12d4c0a2c674..690194c4a0beb7cdef25a53e50b8e978e0132cb3 100644 --- a/p.kirilin/task.1/lambdas/firehose_lambda_function/Cargo.toml +++ b/p.kirilin/task.1/lambdas/firehose_lambda_function/Cargo.toml @@ -25,7 +25,9 @@ dotenv = "0.15" serde_json = "1.0" env_logger = "0.7" aws_lambda_events = "=0.2.5" -lambda_models = {path = "../lambda_models"} + +[dependencies.lambda_models] +path = "../../../lambda_models" # Use sources from git because, the version in crates is # out of date and won't be updated any soon, as i understand. diff --git a/p.kirilin/task.1/lambdas/firehose_lambda_function/src/processors.rs b/p.kirilin/task.1/lambdas/firehose_lambda_function/src/processors.rs index 75d1a94b23bff3ea9eede69fe96f273994579c00..1c657d467f2fdeed56b104567645add3435ac69d 100644 --- a/p.kirilin/task.1/lambdas/firehose_lambda_function/src/processors.rs +++ b/p.kirilin/task.1/lambdas/firehose_lambda_function/src/processors.rs @@ -22,10 +22,16 @@ use std::str::FromStr; /// /// Will return `Err` if any problem occur during processing. pub async fn process(event: KinesisFirehoseEvent) -> Result<KinesisFirehoseResponse, LambdaError> { - info!("Handler started"); + info!("Lambda ignition"); let mut processed_events = Vec::new(); for record in event.records { - processed_events.push(process_input(record).await?) + match process_input(record).await { + Ok(response) => { + info!("Data processed successfully."); + processed_events.push(response) + } + Err(err) => warn!("Can't process data. Cause: {}", err.to_string()), + } } Ok(KinesisFirehoseResponse { records: processed_events, @@ -55,25 +61,33 @@ pub async fn process(event: KinesisFirehoseEvent) -> Result<KinesisFirehoseRespo async fn process_input( record: KinesisFirehoseEventRecord, ) -> Result<KinesisFirehoseResponseRecord, LambdaError> { - let weather_json = String::from_utf8(record.data.to_vec())?; + let processing_failed = KinesisFirehoseResponseRecord { + record_id: record.record_id.clone(), + result: Some(String::from("ProcessingFailed")), + data: record.data.clone(), + }; + let weather_json = String::from_utf8(record.data.to_vec()); + if weather_json.is_err() { + return Ok(processing_failed); + } + let weather_json = weather_json.unwrap(); + info!("Received data: {}", weather_json); let weather_data = serde_json::from_str::<WeatherDataInput>(weather_json.as_str()); + if let Err(parse_err) = weather_data { error!( "Found error while parsing JSON data: {}", parse_err.to_string() ); - return Ok(KinesisFirehoseResponseRecord { - record_id: record.record_id, - result: Some(String::from("ProcessingFailed")), - data: record.data, - }); + return Ok(processing_failed); } let weather_data = weather_data.unwrap(); match weather_data.station_type { StationTypes::AUTHORIZED => { let authorized_output = WeatherDataOutput::from(weather_data.clone()); let json_output = serde_json::to_vec(&authorized_output)?; + debug!("Data is OK and AUTHORIZED"); Ok(KinesisFirehoseResponseRecord { record_id: record.record_id, result: Some(String::from("Ok")), @@ -82,19 +96,20 @@ async fn process_input( } StationTypes::UNAUTHORIZED => { let authorized_output = WeatherDataOutput::from(weather_data.clone()); - publish_to_sns(authorized_output.clone()).await?; + if publish_to_sns(authorized_output.clone()).await.is_err() { + warn!("Can't publish message to sns"); + return Ok(processing_failed); + } + let json_output = serde_json::to_vec(&authorized_output)?; + debug!("Data is OK but UNAUTHORIZED"); Ok(KinesisFirehoseResponseRecord { record_id: record.record_id, result: Some(String::from("Dropped")), data: Base64Data(json_output), }) } - StationTypes::UNKNOWN => Ok(KinesisFirehoseResponseRecord { - record_id: record.record_id, - result: Some(String::from("ProcessingFailed")), - data: record.data, - }), + StationTypes::UNKNOWN => Ok(processing_failed), } } @@ -117,7 +132,7 @@ pub async fn publish_to_sns(output: WeatherDataOutput) -> Result<(), LambdaError let sns_client = SnsClient::new(Region::from_str(current_region.as_str()).map_err(|err| { std::io::Error::new( ErrorKind::Other, - format!("Can't connect to sns. {}", err.to_string()), + format!("Wrong region. {}", err.to_string()), ) })?); debug!("Created SNS client"); diff --git a/p.kirilin/task.1/lambdas/sns_lambda_function/Cargo.toml b/p.kirilin/task.1/lambdas/sns_lambda_function/Cargo.toml index 1b9307c4da7db1d28e8cd0f798a0e52f37ffe5c4..d2f612fdea5591b2bc378d133111e4d3c3b40e83 100644 --- a/p.kirilin/task.1/lambdas/sns_lambda_function/Cargo.toml +++ b/p.kirilin/task.1/lambdas/sns_lambda_function/Cargo.toml @@ -16,7 +16,9 @@ dotenv = "0.15" serde_json = "1.0" env_logger = "0.7" aws_lambda_events = "=0.2.5" -lambda_models = {path = "../lambda_models"} + +[dependencies.lambda_models] +path = "../../../lambda_models" # Use sources from git because, the version in crates is # out of date and won't be updated any soon, as i understand. diff --git a/p.kirilin/task.1/lambdas/sns_lambda_function/Makefile b/p.kirilin/task.1/lambdas/sns_lambda_function/Makefile index 55c6c77296561f28830f20838c7e72bd1db83208..a140af21c42327cfa4bb2509a9f6e93667b08834 100644 --- a/p.kirilin/task.1/lambdas/sns_lambda_function/Makefile +++ b/p.kirilin/task.1/lambdas/sns_lambda_function/Makefile @@ -19,6 +19,8 @@ test: integration_test: build_lambda cat input_event_example.json | docker run --rm \ -e LOG_LEVEL='debug' \ + -e AWS_REGION='eu-central-1' \ + -e S3_BUCKET='test' \ -v "${SRC_DIR}/target/x86_64-unknown-linux-musl/release/":/var/task:ro,delegated \ -i -e DOCKER_LAMBDA_USE_STDIN=1 lambci/lambda:provided diff --git a/p.kirilin/task.1/lambdas/sns_lambda_function/src/processors.rs b/p.kirilin/task.1/lambdas/sns_lambda_function/src/processors.rs index 26eca6b8b7456ac0a27c85272b536187740c672a..76257e59bba9c5afc85ba6a7a71d71bf6b9c633e 100644 --- a/p.kirilin/task.1/lambdas/sns_lambda_function/src/processors.rs +++ b/p.kirilin/task.1/lambdas/sns_lambda_function/src/processors.rs @@ -15,12 +15,21 @@ use std::str::FromStr; /// # Errors /// /// Will return `LambdaError` if any problem occur during processing. +/// * `S3_BUCKET` env variable was not provided; +/// * `AWS_REGION` env variable was not found; +/// * `AWS_REGION` env variable is incorrect; pub async fn process(input: SnsEvent) -> Result<(), LambdaError> { info!("Lambda ignition"); + let current_region = std::env::var("AWS_REGION")?; + let s3_bucket = std::env::var("S3_BUCKET")?; + let region = Region::from_str(current_region.as_str())?; for record in input.records { debug!("Got event from {:#?}", record.event_source); debug!("Event version: {:#?}", record.event_version); - process_record(record.sns).await? + match process_record(record.sns, ®ion, s3_bucket.as_str()).await { + Ok(_) => info!("Message processed successfully"), + Err(err) => warn!("Can't process message. Cause: {}", err.to_string()), + } } Ok(()) } @@ -31,12 +40,13 @@ pub async fn process(input: SnsEvent) -> Result<(), LambdaError> { /// # Errors /// /// Function may fail in several cases: -/// * `S3_BUCKET` env variable was not provided; -/// * `AWS_REGION` env variable was not found; -/// * `AWS_REGION` env variable is incorrect; /// * Message can't be parsed to [`WeatherDataOutput`] /// * Publishing to S3 failed for some reason. -pub async fn process_record(record: SnsEntity) -> Result<(), LambdaError> { +pub async fn process_record( + record: SnsEntity, + region: &Region, + s3_bucket: &str, +) -> Result<(), LambdaError> { debug!( "Type: {:#?}", record.type_.unwrap_or_else(|| String::from("")) @@ -47,14 +57,13 @@ pub async fn process_record(record: SnsEntity) -> Result<(), LambdaError> { Ok(()) } Some(message_json) => { + let s3_client = rusoto_s3::S3Client::new(region.clone()); let weather_data: WeatherDataOutput = serde_json::from_str(message_json.as_str())?; - let current_region = std::env::var("AWS_REGION")?; - let s3_client = rusoto_s3::S3Client::new(Region::from_str(current_region.as_str())?); let mut request = PutObjectRequest::default(); - request.bucket = std::env::var("S3_BUCKET")?; + request.bucket = String::from(s3_bucket); request.body = Some(message_json.into_bytes().into()); request.key = format!( - "UNATHORIZED/{}/{}/{}.txt", + "UNAUTHORIZED/{}/{}/{}.txt", weather_data.country, weather_data.city, uuid::Uuid::new_v4().to_string() diff --git a/p.kirilin/task.1/variables.tf b/p.kirilin/task.1/variables.tf index 7ea4470818e9785970881192683d02618ad549a7..75362c6d0506d90cef72a35e85964bf236be55e4 100644 --- a/p.kirilin/task.1/variables.tf +++ b/p.kirilin/task.1/variables.tf @@ -7,3 +7,8 @@ variable "sns_lambda_function" { type = string default = "sns_lambda_function.zip" } + +variable "s3_bucket_name" { + type = string + default = "s3rius-weather-bucket" +} \ No newline at end of file diff --git a/p.kirilin/task.2/.gitignore b/p.kirilin/task.2/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..2f423a1e1ac0ea372d0adf97e44a8543578616d2 --- /dev/null +++ b/p.kirilin/task.2/.gitignore @@ -0,0 +1,4 @@ +.idea/ +.terraform +terraform.tfstate +terraform.tfstate.backup \ No newline at end of file diff --git a/p.kirilin/task.2/applications/.gitignore b/p.kirilin/task.2/applications/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..617f79897f217d208b50d79346aae9a3f3f62256 --- /dev/null +++ b/p.kirilin/task.2/applications/.gitignore @@ -0,0 +1,3 @@ +target/ +.idea/ +Cargo.lock \ No newline at end of file diff --git a/p.kirilin/task.2/applications/ec2_sqs_listener/Cargo.toml b/p.kirilin/task.2/applications/ec2_sqs_listener/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..16a46b3e57492f6cc4059e6f38c6756cd18ba7bf --- /dev/null +++ b/p.kirilin/task.2/applications/ec2_sqs_listener/Cargo.toml @@ -0,0 +1,71 @@ +[package] +name = "ec2_sqs_listener" +version = "0.1.0" +authors = ["Pavel Kirilin <pavel.kirilin@simbirsoft.com>"] +edition = "2018" +autobins = false +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[[bin]] +name = "bootstrap" +path = "src/main.rs" + +[dependencies] +log = "0.4" +dotenv = "0.15" +serde_json = "1.0" + +[dependencies.lambda_models] +path = "../../../lambda_models" +features = [ + "parsing_parquet" +] + +[dependencies.uuid] +version = "0.8.1" +features = [ + "v4" +] + +[dependencies.tokio] +version = "0.2" +default_features = false +features = [ + "time", + "fs", + "blocking" +] + +[dependencies.rusoto_core] +version = "0.43" +default_features = false +features = [ + "rustls" +] + +[dependencies.rusoto_sqs] +version = "0.43" +default_features = false +features = [ + "rustls" +] + +[dependencies.rusoto_s3] +version = "0.43" +default_features = false +features = [ + "rustls" +] + +[dependencies.rusoto_sns] +version = "0.43" +default_features = false +features = [ + "rustls" +] + +[dependencies.fern] +version = "0.6.0" +features = [ + "colored" +] \ No newline at end of file diff --git a/p.kirilin/task.2/applications/ec2_sqs_listener/Makefile b/p.kirilin/task.2/applications/ec2_sqs_listener/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..04d68e8a27463f154d2868ae1152b3a75e1a9027 --- /dev/null +++ b/p.kirilin/task.2/applications/ec2_sqs_listener/Makefile @@ -0,0 +1,32 @@ +SRC_DIR=$(shell pwd) + +# Default target +all: validate + +# Run formatting +fmt: + cargo fmt + +# Find some common vulnerabilities if any. +clippy: + cargo clippy --all-targets --all-features -- -D "clippy::pedantic" + +# Standard unit testing. +test: + cargo test + +# Run all checks +validate: fmt clippy test + +# Build binary with musl-gcc target (Required by AWS runtime) +build_lambda: + @rustup target add x86_64-unknown-linux-musl + cargo build --release --target x86_64-unknown-linux-musl + +# Package resulting binary in zip +# If zip_name variable defined then it'll pack bin in specific zip. +install: build_lambda +ifndef zip_name + $(eval zip_name=application) +endif + zip -j "${zip_name}.zip" target/x86_64-unknown-linux-musl/release/bootstrap diff --git a/p.kirilin/task.2/applications/ec2_sqs_listener/src/main.rs b/p.kirilin/task.2/applications/ec2_sqs_listener/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..f4a63974bff19789a6283b5fcbb47d09242e7c4c --- /dev/null +++ b/p.kirilin/task.2/applications/ec2_sqs_listener/src/main.rs @@ -0,0 +1,55 @@ +#[macro_use] +extern crate log; +extern crate tokio; + +use fern::colors; +use log::LevelFilter; +use std::error::Error; +use std::str::FromStr; + +mod processor; + +type AppError = Box<dyn Error + Send + Sync + 'static>; + +/// Setting up logger at startup +fn setup_logger() -> Result<(), AppError> { + let log_level = std::env::var("LOG_LEVEL").unwrap_or_else(|_| String::from("INFO")); + let log_file = + std::env::var("LOG_FILE").unwrap_or_else(|_| String::from("/var/log/sqs_listener.log")); + + let colors = colors::ColoredLevelConfig::new() + .info(colors::Color::Green) + .warn(colors::Color::Magenta) + .debug(colors::Color::Blue) + .error(colors::Color::Red) + .trace(colors::Color::BrightRed); + fern::Dispatch::new() + .format(move |out, message, record| { + out.finish(format_args!( + "[{}] {}", + colors.color(record.level()), + message + )) + }) + .level(LevelFilter::from_str(log_level.as_str()).unwrap()) + .chain(std::io::stdout()) + .chain(fern::log_file(log_file)?) + .apply()?; + Ok(()) +} + +/// Main execution point. +/// In this function we setup connection +/// and start listening to SQS +/// +/// # Errors +/// `SQS_QUEUE_ID` env was not provided +/// `AWS_REGION` env was not provided +#[tokio::main] +async fn main() -> Result<(), AppError> { + dotenv::dotenv().ok(); + setup_logger()?; + + processor::process().await?; + Ok(()) +} diff --git a/p.kirilin/task.2/applications/ec2_sqs_listener/src/processor.rs b/p.kirilin/task.2/applications/ec2_sqs_listener/src/processor.rs new file mode 100644 index 0000000000000000000000000000000000000000..2de088b83dc1740cd938d3b3e2f515dbaf0be1c3 --- /dev/null +++ b/p.kirilin/task.2/applications/ec2_sqs_listener/src/processor.rs @@ -0,0 +1,286 @@ +use crate::AppError; +use lambda_models::WeatherDataOutput; +use rusoto_core::Region; +use rusoto_s3::{GetObjectRequest, S3}; +use rusoto_sns::{PublishInput, Sns, SnsClient}; +use rusoto_sqs::{DeleteMessageRequest, Message, ReceiveMessageRequest, Sqs, SqsClient}; +use std::str::FromStr; +use std::time::Duration; +use tokio::fs::File; +use tokio::prelude::*; +use tokio::time::delay_for; + +fn build_error(description: String) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, description) +} + +/// ## Main processing function +/// This function initializing `SQSClient` +/// and start pooling for new messages. +/// +/// # Errors +/// * `SQS_QUEUE_ID` env was not found. +/// * `AWS_REGION` env was not found. +/// * `S3_BUCKET` env was not found. +/// * `AWS_REGION` is incorrect (Unknown AWS region). +pub async fn process() -> Result<(), AppError> { + let sqs_queue_id = std::env::var("SQS_QUEUE_ID") + .map_err(|_| build_error(String::from("SQS_QUEUE_ID was not provided")))?; + debug!("Found SQS queue ID: {}", sqs_queue_id.as_str()); + let aws_region = std::env::var("AWS_REGION") + .map_err(|_| build_error(String::from("AWS_REGION was not provided")))?; + debug!("Found AWS region: {}", aws_region.as_str()); + let s3_bucket = std::env::var("S3_BUCKET") + .map_err(|_| build_error(String::from("S3_BUCKET was not provided")))?; + debug!("S3 BUCKET: {}", s3_bucket.as_str()); + let sns_topic = std::env::var("SNS_TOPIC") + .map_err(|_| build_error(String::from("SNS_TOPIC was not provided")))?; + debug!("SNS TOPIC: {}", s3_bucket.as_str()); + + let aws_region = Region::from_str(aws_region.as_str()) + .map_err(|_| build_error(String::from("Incorrect region provided")))?; + + pool_messages(sqs_queue_id, s3_bucket, sns_topic, aws_region).await; + Ok(()) +} + +/// Request pending message from sqs +/// and process them. +/// All errors handled here to continue execution in any case. +/// # Parameters +/// * `sqs_queue_id`: `SQS` queue to poll for messages; +/// * `s3_bucket`: `S3` bucket to get files described in incoming messages; +/// * `sns_topic`: `SNS` topic to publish matched weather data; +/// * `aws_region`: current `AWS` region. +async fn pool_messages( + sqs_queue_id: String, + s3_bucket: String, + sns_topic: String, + aws_region: Region, +) { + info!("Start pooling message from SQS"); + loop { + info!("Sleeping for 2 seconds"); + delay_for(Duration::from_secs(2)).await; + let sqs_client = SqsClient::new(aws_region.clone()); + let messages = request_messages(&sqs_client, sqs_queue_id.as_str()).await; + if messages.is_err() { + warn!( + "Can't receive messages. Cause: {}", + messages.err().unwrap().to_string() + ); + continue; + } + for message in &messages.unwrap() { + let message = message.clone(); + let message_id = message + .message_id + .clone() + .unwrap_or_else(|| String::from("with unknown ID")); + let processing_res = process_message( + s3_bucket.as_str(), + sns_topic.as_str(), + &aws_region, + message.clone(), + ) + .await; + match processing_res { + Ok(_) => { + info!("Message {} processed successfully", message_id); + if let Err(error) = + delete_message(&sqs_client, sqs_queue_id.as_str(), message.clone()).await + { + warn!("Can't delete message from queue. {}", error.to_string()); + }; + } + Err(error) => error!( + "Can't process message: {:?}. Cause: {}", + message.message_id, + error.to_string() + ), + } + } + } +} + +/// Delete message from `SQS` queue. +/// +/// # Parameters +/// * `sqs_client`: Connection to `SQS`; +/// * `queue_id`: Link to queue; +/// * `message`: Message from SQS to delete. +/// +/// # Errors +/// +/// * Message receipt empty; +/// * Can't delete message from queue. +async fn delete_message( + sqs_client: &SqsClient, + queue_id: &str, + message: Message, +) -> Result<(), AppError> { + let mut delete_request = DeleteMessageRequest::default(); + delete_request.queue_url = String::from(queue_id); + if message.receipt_handle.is_none() { + return Err(Box::new(build_error(String::from( + "Unknown receipt handle in message", + )))); + } + delete_request.receipt_handle = String::from(message.receipt_handle.unwrap().as_str()); + sqs_client.delete_message(delete_request).await?; + Ok(()) +} + +/// Request new messages from SQS queue +/// +/// # Parameters +/// * `sqs_client`: Connection to `SQS`; +/// * `queue_id`: Link to queue; +/// +/// # Errors +/// * Can't receive messages; +/// * The `Queue` is empty. +async fn request_messages( + sqs_client: &SqsClient, + queue_id: &str, +) -> Result<Vec<Message>, AppError> { + let mut request = ReceiveMessageRequest::default(); + request.queue_url = String::from(queue_id); + request.max_number_of_messages = Some(10); + let sqs_result = sqs_client.receive_message(request).await.map_err(|err| { + Box::new(build_error(format!( + "Can't request more messages. Cause: {}", + err.to_string() + ))) + })?; + if sqs_result.messages.is_none() { + return Err(Box::new(build_error(String::from("The queue is empty")))); + } + Ok(sqs_result.messages.unwrap()) +} + +/// Get message body and delete it from queue. +/// +/// # Parameters +/// * `s3_bucket`: `S3` bucket where parquet is located; +/// * `sns_topic`: `SNS` topic to publish matched weather data; +/// * `aws_region`: Current AWS region; +/// * `message`: `Message` to process. +/// +/// # Errors +/// * Can't load provided file. +async fn process_message( + s3_bucket: &str, + sns_topic: &str, + aws_region: &Region, + message: Message, +) -> Result<(), AppError> { + debug!("Processing message"); + if message.body.is_none() { + return Ok(()); + } + let body = message.body.unwrap().clone(); + debug!("Received s3 file key: {}", body.as_str()); + let uuid = uuid::Uuid::new_v4().to_string(); + let file_name = format!("/tmp/{}.parquet", uuid); + load_and_save_parquet(body, aws_region, String::from(s3_bucket), file_name.clone()).await?; + parse_and_publish_parquet(file_name, sns_topic, aws_region).await?; + Ok(()) +} + +/// Loading and saving parquet file to filesystem. +/// All saved parquet files available until scaling down. +/// +/// # Parameters +/// * `file_key`: key to file in s3 storage; +/// * `aws_region`: Current AWS region where s3 located; +/// * `s3_bucket`: bucket obviously; +/// * `file_name`: name of file on local system where to store parquet. +/// +/// # Errors +/// * Can't load object from S3; +/// * Can't read object bytes; +/// * Can't create file in filesystem; +/// * Can't write to created file. +async fn load_and_save_parquet( + file_key: String, + aws_region: &Region, + s3_bucket: String, + file_name: String, +) -> Result<(), AppError> { + debug!("Loading s3 object"); + let s3_client = rusoto_s3::S3Client::new(aws_region.clone()); + let mut object_request = GetObjectRequest::default(); + object_request.bucket = s3_bucket.clone(); + object_request.key = file_key.clone(); + let s3_object = s3_client.get_object(object_request).await?; + if s3_object.body.is_none() { + return Err(Box::new(build_error(String::from( + "S3 object body is null", + )))); + } + let body = s3_object.body.unwrap(); + let mut contents: Vec<u8> = vec![]; + body.into_async_read().read_to_end(&mut contents).await?; + let mut file = File::create(file_name).await?; + file.write_all(contents.as_slice()).await?; + Ok(()) +} + +/// Parse parquet file and publish it to SNS. +/// # Parameters: +/// * `filename`: file in local system to read parquet file; +/// * `aws_region`: current AWS region; +/// +/// # Errors +/// * The humidity is above 10; +/// * The pressure is above 100; +/// * The temperature is less than 30. +async fn parse_and_publish_parquet( + file_name: String, + sns_topic: &str, + aws_region: &Region, +) -> Result<(), AppError> { + debug!("Parsing parquet file {}", file_name.as_str()); + let records = lambda_models::WeatherDataOutput::from_parquet_file(file_name).await?; + let sns_client = rusoto_sns::SnsClient::new(aws_region.clone()); + for item in records { + debug!("{:#?}", item.clone()); + if item.humidity > 10 && item.pressure > 100 && item.temperature < 30 { + match publish_to_sns(item, sns_topic, &sns_client).await { + Ok(_) => { + info!("Weather data successfully processed"); + } + Err(err) => { + warn!("Can't send weather data to SNS. Cause {}", err.to_string()); + } + } + } else { + warn!("Weather data is ok"); + } + } + Ok(()) +} + +/// Publish valid messages to SNS +/// +/// # Parameters +/// `weather_data`: parsed parquet from `S3`; +/// `sns_topic`: topic to publish to; +/// `sns_client`: Connection to `SNS`. +/// +/// # Errors +/// * Can't serialize `WeatherData` to JSON; +/// * Can't publish to `SNS`. +async fn publish_to_sns( + weather_data: WeatherDataOutput, + sns_topic: &str, + sns_client: &SnsClient, +) -> Result<(), AppError> { + debug!("Publishing to SNS"); + let mut publish_input = PublishInput::default(); + publish_input.message = serde_json::to_string(&weather_data)?; + publish_input.topic_arn = Some(String::from(sns_topic)); + sns_client.publish(publish_input).await?; + Ok(()) +} diff --git a/p.kirilin/task.2/applications/s3_lambda_function/Cargo.toml b/p.kirilin/task.2/applications/s3_lambda_function/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..6feabf57056f2a09a520570eddc65a314af6c65f --- /dev/null +++ b/p.kirilin/task.2/applications/s3_lambda_function/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "s3_lambda_function" +version = "0.1.0" +authors = ["Pavel Kirilin <pavel.kirilin@simbirsoft.com>"] +edition = "2018" +autobins = false +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[[bin]] +name = "bootstrap" +path = "src/main.rs" + +[dependencies] +log = "0.4" +dotenv = "0.15" +serde_json = "1.0" +env_logger = "0.7" +aws_lambda_events = "=0.2.5" + +# Use sources from git because, the version in crates is +# out of date and won't be updated any soon, as i understand. +[dependencies.lambda] +git = "https://github.com/awslabs/aws-lambda-rust-runtime" + +[dependencies.tokio] +version = "0.2.4" +features = [ + "rt-core" +] + +[dependencies.rusoto_core] +version = "0.43" +default_features = false +features = [ + "rustls" +] + +[dependencies.rusoto_sqs] +version = "0.43" +default_features = false +features = [ + "rustls" +] \ No newline at end of file diff --git a/p.kirilin/task.2/applications/s3_lambda_function/Makefile b/p.kirilin/task.2/applications/s3_lambda_function/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..53efdc78085f3945189dc023eeba4bd943fccada --- /dev/null +++ b/p.kirilin/task.2/applications/s3_lambda_function/Makefile @@ -0,0 +1,32 @@ +SRC_DIR=$(shell pwd) + +# Default target +all: validate + +# Run formatting +fmt: + cargo fmt + +# Find some common vulnerabilities if any. +clippy: + cargo clippy --all-targets --all-features -- -D "clippy::pedantic" + +# Standard unit testing. +test: + cargo test + +# Run all checks +validate: fmt clippy test + +# Build binary with musl-gcc target (Required by AWS runtime) +build_lambda: + @rustup target add x86_64-unknown-linux-musl + cargo build --release --target x86_64-unknown-linux-musl + +# Package resulting binary in zip +# If zip_name variable defined then it'll pack bin in specific zip. +install: build_lambda +ifndef zip_name + $(eval zip_name=lambda) +endif + zip -j "${zip_name}.zip" target/x86_64-unknown-linux-musl/release/bootstrap diff --git a/p.kirilin/task.2/applications/s3_lambda_function/src/main.rs b/p.kirilin/task.2/applications/s3_lambda_function/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..ef5b9e76ef255183082d3a8420fd4c691c24e3a4 --- /dev/null +++ b/p.kirilin/task.2/applications/s3_lambda_function/src/main.rs @@ -0,0 +1,31 @@ +#[macro_use] +extern crate log; + +/// Main processing module. +/// All processing function placed here +pub mod processors; + +use std::error::Error; +use tokio::runtime::Runtime; + +/// Common error type. +/// It can be used with any error that +/// implements `Send` + `Sync` traits. +type LambdaError = Box<dyn Error + Send + Sync + 'static>; + +/// This function starting point of the binary. It points the lambda to execution function. +/// Real lambda processor located in `processors` module. +fn main() -> Result<(), LambdaError> { + dotenv::dotenv().ok(); + + let env = env_logger::Env::default() + .filter_or("LOG_LEVEL", "info") + .write_style_or("LOG_STYLE", "never"); + + env_logger::init_from_env(env); + + let mut rt = Runtime::new().expect("can't start Tokio runtime"); + let handler = lambda::handler_fn(processors::process); + rt.block_on(lambda::run(handler))?; + Ok(()) +} diff --git a/p.kirilin/task.2/applications/s3_lambda_function/src/processors.rs b/p.kirilin/task.2/applications/s3_lambda_function/src/processors.rs new file mode 100644 index 0000000000000000000000000000000000000000..d29d5fcf9349e54f18ad54c5bddb29a782e47b99 --- /dev/null +++ b/p.kirilin/task.2/applications/s3_lambda_function/src/processors.rs @@ -0,0 +1,61 @@ +use crate::LambdaError; +use aws_lambda_events::event::s3::{S3Event, S3EventRecord}; +use rusoto_core::Region; +use rusoto_sqs::{SendMessageRequest, Sqs}; +use std::io::ErrorKind; +use std::str::FromStr; + +/// Main data processing point. +/// +/// This function proceed S3 put events. +/// After object was created in s3 this processor will send information +/// about target records in sqs. +/// +/// # Errors +/// +/// Will return `Err` if any problem occur during processing. +pub async fn process(event: S3Event) -> Result<(), Vec<LambdaError>> { + info!("Handler started"); + for record in event.records { + match process_input(record).await { + Ok(_) => info!("Information successfully sent."), + Err(err) => { + error!("Can't send information about this event"); + error!("{}", err.to_string()) + } + } + } + Ok(()) +} + +/// Main processor logic. +/// It will publish information about new object from s3 in SQS. +/// +/// `AWS_REGION` is variable provided by AWS lambda environment. +/// `SQS_QUEUE_ID` is injected by terraform. +/// +/// # Errors +/// * The `AWS_REGION` variable not found; +/// * The `AWS_REGION` env variable is incorrect; +/// * The `SQS_QUEUE_ID` env is not set; +/// * The `SQS_QUEUE_ID` env is incorrect. +pub async fn process_input(record: S3EventRecord) -> Result<(), LambdaError> { + let queue_url = std::env::var("SQS_QUEUE_ID")?; + let current_region = std::env::var("AWS_REGION")?; + let client = rusoto_sqs::SqsClient::new(Region::from_str(current_region.as_str())?); + info!("Created sqs client"); + let mut message = SendMessageRequest::default(); + info!("Resolved queue url: {}", queue_url); + message.queue_url = queue_url; + let object_key = record.s3.object.key; + info!("Event data key: {:?}", object_key); + if object_key.is_none() { + return Err(Box::new(std::io::Error::new( + ErrorKind::Other, + String::from("Object path is unknown!"), + ))); + } + message.message_body = object_key.unwrap(); + client.send_message(message).await?; + Ok(()) +} diff --git a/p.kirilin/task.2/aws_services/asg.tf b/p.kirilin/task.2/aws_services/asg.tf new file mode 100644 index 0000000000000000000000000000000000000000..c28ea5bdff4566c9e916df7909c8fa9e451a0734 --- /dev/null +++ b/p.kirilin/task.2/aws_services/asg.tf @@ -0,0 +1,105 @@ +data "aws_availability_zones" "available" { + state = "available" +} + +resource "aws_autoscaling_group" "s3rius-asg-processors" { + max_size = 3 + min_size = 0 + force_delete = true + launch_configuration = aws_launch_configuration.asg-launch-config.name + availability_zones = data.aws_availability_zones.available.names + + health_check_type = "ELB" + load_balancers = [ + aws_elb.load_balancer.name + ] +} + +resource "aws_autoscaling_policy" "scale_up_policy" { + name = "sqs-asg-scale-up-policy" + scaling_adjustment = 1 + autoscaling_group_name = aws_autoscaling_group.s3rius-asg-processors.name + adjustment_type = "ChangeInCapacity" + cooldown = 300 +} + +resource "aws_autoscaling_policy" "scale_down_policy" { + name = "sqs-asg-scale-down-policy" + scaling_adjustment = -1 + autoscaling_group_name = aws_autoscaling_group.s3rius-asg-processors.name + adjustment_type = "ChangeInCapacity" + cooldown = 300 +} + + +resource "aws_elb" "load_balancer" { + name = "terraform-asg-example" + security_groups = [ + aws_security_group.outbound_traffic_security.id + ] + + availability_zones = data.aws_availability_zones.available.names + + health_check { + target = "HTTP:${var.ec2_port}/" + interval = 30 + timeout = 3 + healthy_threshold = 2 + unhealthy_threshold = 2 + } + + # This adds a listener for incoming HTTP requests. + listener { + lb_port = var.elb_port + lb_protocol = "http" + instance_port = var.ec2_port + instance_protocol = "http" + } +} + +resource "aws_security_group" "inbound_traffic_security" { + name = "inbound-ec2" + + ingress { + from_port = var.elb_port + to_port = var.elb_port + protocol = "tcp" + cidr_blocks = [ + "0.0.0.0/0" + ] + } + + ingress { + from_port = 22 + to_port = 22 + protocol = "tcp" + cidr_blocks = [ + "0.0.0.0/0" + ] + } +} + +resource "aws_security_group" "outbound_traffic_security" { + name = "outbound-ec2" + + # Allow all outbound + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = [ + "0.0.0.0/0" + ] + } + + # Inbound HTTP from anywhere + ingress { + from_port = var.ec2_port + to_port = var.ec2_port + protocol = "tcp" + cidr_blocks = [ + "0.0.0.0/0" + ] + } +} + diff --git a/p.kirilin/task.2/aws_services/cloudwatch.tf b/p.kirilin/task.2/aws_services/cloudwatch.tf new file mode 100644 index 0000000000000000000000000000000000000000..f862d9ec2b5b6c1f252c94e134361f54f106d26e --- /dev/null +++ b/p.kirilin/task.2/aws_services/cloudwatch.tf @@ -0,0 +1,50 @@ +resource "aws_cloudwatch_log_group" "s3_lamda_logs" { + name = "/aws/lambda/${aws_lambda_function.s3_lambda.function_name}" + retention_in_days = 1 +} + + +resource "aws_cloudwatch_log_group" "asg_ec2_logs" { + name = "ec2_logs" + retention_in_days = 1 +} + +resource "aws_cloudwatch_metric_alarm" "sqs-asg-metric-alarm" { + alarm_name = "SQS-message-alarm" + comparison_operator = "GreaterThanOrEqualToThreshold" + evaluation_periods = "1" + metric_name = "NumberOfMessagesSent" + namespace = "AWS/SQS" + period = "60" + statistic = "SampleCount" + threshold = "2" + + dimensions = { + QueueName = aws_sqs_queue.s3rius_sqs_queue.name + } + + alarm_description = "This metric monitors SQS messages and fire ec2 from asg" + alarm_actions = [ + aws_autoscaling_policy.scale_up_policy.arn + ] +} + +resource "aws_cloudwatch_metric_alarm" "cpu-asg-metric-alarm" { + alarm_name = "CPU-message-alarm" + evaluation_periods = 2 + comparison_operator = "LessThanThreshold" + metric_name = "CPUUtilization" + namespace = "AWS/EC2" + period = "120" + statistic = "Average" + threshold = "5" + + dimensions = { + AutoScalingGroupName = aws_autoscaling_group.s3rius-asg-processors.name + } + + alarm_description = "This metric monitors CPU utilization for scaling down" + alarm_actions = [ + aws_autoscaling_policy.scale_down_policy.arn + ] +} \ No newline at end of file diff --git a/p.kirilin/task.2/aws_services/ec2.tf b/p.kirilin/task.2/aws_services/ec2.tf new file mode 100644 index 0000000000000000000000000000000000000000..ac031dde901ce3ae020e2609c96b8920354096cf --- /dev/null +++ b/p.kirilin/task.2/aws_services/ec2.tf @@ -0,0 +1,118 @@ +resource "aws_launch_configuration" "asg-launch-config" { + name = "s3rius-autoscaling-launch-config" + image_id = "ami-0a9e2b8a093c02922" + instance_type = "t2.micro" + security_groups = [ + aws_security_group.inbound_traffic_security.id, + aws_security_group.outbound_traffic_security.id + ] + + iam_instance_profile = aws_iam_instance_profile.s3rius_profile.id + key_name = aws_key_pair.aws_key_pair.key_name + user_data = data.template_file.asg_user_data.rendered + depends_on = [ + aws_s3_bucket_object.application_data, + ] +} + +resource "aws_key_pair" "aws_key_pair" { + key_name = "aws_key_pair" + public_key = file(var.ec2_ssh_key_file) +} + +data "template_file" "asg_user_data" { + template = file("${path.module}/utils/asg_user_data.sh") + vars = { + meta_bucket = aws_s3_bucket.asg_meta_bucket.bucket + application_key = var.ec2_application_s3_key + sqs_queue_id = aws_sqs_queue.s3rius_sqs_queue.id + aws_region = var.aws_region + cloudwatch_conf_key = var.ec2_cloudwatch_conf_key + s3_bucket = aws_s3_bucket.s3_weather_bucket.bucket + sns_topic = aws_sns_topic.asg-weather-topic.arn + } +} + + +resource "aws_iam_instance_profile" "s3rius_profile" { + name = "s3rius" + role = aws_iam_role.ec2_profile_role.name +} + +resource "aws_iam_role" "ec2_profile_role" { + name = "ec2_profile_role" + path = "/" + assume_role_policy = data.aws_iam_policy_document.ec2_role_doc.json +} + +resource "aws_iam_role_policy_attachment" "ec2_extra_role_policy" { + role = aws_iam_role.ec2_profile_role.name + policy_arn = aws_iam_policy.ec2_extra_policy.arn +} + +resource "aws_iam_policy" "ec2_extra_policy" { + path = "/" + policy = data.aws_iam_policy_document.ec2_extra_doc.json +} + + +data "aws_iam_policy_document" "ec2_role_doc" { + version = "2012-10-17" + + statement { + actions = [ + "sts:AssumeRole" + ] + + principals { + identifiers = [ + "ec2.amazonaws.com" + ] + type = "Service" + } + + effect = "Allow" + } +} + +data "aws_iam_policy_document" "ec2_extra_doc" { + version = "2012-10-17" + + statement { + effect = "Allow" + actions = [ + "s3:GetBucketLocation", + "s3:GetObject", + "s3:ListBucket", + "s3:ListBucketMultipartUploads", + ] + resources = [ + aws_s3_bucket.asg_meta_bucket.arn, + "${aws_s3_bucket.asg_meta_bucket.arn}/*", + aws_s3_bucket.s3_weather_bucket.arn, + "${aws_s3_bucket.s3_weather_bucket.arn}/*" + + ] + } + + statement { + effect = "Allow" + actions = [ + "sqs:*" + ] + resources = [ + aws_sqs_queue.s3rius_sqs_queue.arn + ] + } + + statement { + effect = "Allow" + actions = [ + "sns:Publish", + ] + resources = [ + aws_sns_topic.asg-weather-topic.arn + ] + } + +} \ No newline at end of file diff --git a/p.kirilin/task.2/aws_services/lambda_for_s3.tf b/p.kirilin/task.2/aws_services/lambda_for_s3.tf new file mode 100644 index 0000000000000000000000000000000000000000..20f2073ed50d0db191541d874632ce794e9716ad --- /dev/null +++ b/p.kirilin/task.2/aws_services/lambda_for_s3.tf @@ -0,0 +1,82 @@ +resource "aws_lambda_function" "s3_lambda" { + source_code_hash = filebase64sha256(var.s3_lambda_zip) + role = aws_iam_role.s3_lambda_role.arn + memory_size = var.lambda_memory_size + timeout = var.lambda_timeout + function_name = var.s3_lambda_name + filename = var.s3_lambda_zip + handler = "Provided" + runtime = "provided" + + environment { + variables = { + SQS_QUEUE_ID = aws_sqs_queue.s3rius_sqs_queue.id, + LOG_LEVEL = "debug,hyper=error,rusoto_core=info", + LOG_STYLE = "none" + } + } +} + +resource "aws_lambda_permission" "allow_bucket" { + statement_id = "AllowExecutionFromS3Bucket" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.s3_lambda.arn + principal = "s3.amazonaws.com" + source_arn = aws_s3_bucket.s3_weather_bucket.arn +} + +resource "aws_iam_role" "s3_lambda_role" { + name = var.s3_lambda_name + assume_role_policy = data.aws_iam_policy_document.s3_lambda_policy_doc.json +} + +resource "aws_iam_role_policy_attachment" "s3_lambda_policy_attachment" { + role = aws_iam_role.s3_lambda_role.name + policy_arn = aws_iam_policy.s3_lambda_extra_doc.arn +} + +resource "aws_iam_policy" "s3_lambda_extra_doc" { + path = "/" + policy = data.aws_iam_policy_document.s3_lambda_extra_doc.json +} + + +data "aws_iam_policy_document" "s3_lambda_policy_doc" { + version = "2012-10-17" + statement { + effect = "Allow" + actions = [ + "sts:AssumeRole", + ] + principals { + identifiers = [ + "lambda.amazonaws.com" + ] + type = "Service" + } + } +} + +data "aws_iam_policy_document" "s3_lambda_extra_doc" { + statement { + effect = "Allow" + actions = [ + "logs:PutLogEvents", + "logs:CreateLogStream", + "logs:CreateLogGroup" + ] + resources = [ + aws_cloudwatch_log_group.s3_lamda_logs.arn + ] + } + + statement { + effect = "Allow" + actions = [ + "sqs:SendMessage", + ] + resources = [ + aws_sqs_queue.s3rius_sqs_queue.arn + ] + } +} \ No newline at end of file diff --git a/p.kirilin/task.2/aws_services/s3_for_asg.tf b/p.kirilin/task.2/aws_services/s3_for_asg.tf new file mode 100644 index 0000000000000000000000000000000000000000..547e57a6dd1e3943b87db0db75556c5498f890d4 --- /dev/null +++ b/p.kirilin/task.2/aws_services/s3_for_asg.tf @@ -0,0 +1,11 @@ +resource "aws_s3_bucket" "asg_meta_bucket" { + bucket = "s3rius-asg-meta-bucket" + acl = "private" + force_destroy = true +} + +resource "aws_s3_bucket_object" "application_data" { + bucket = aws_s3_bucket.asg_meta_bucket.bucket + key = var.ec2_application_s3_key + source = var.ec2_application_bin +} \ No newline at end of file diff --git a/p.kirilin/task.2/aws_services/s3_weather_bucket.tf b/p.kirilin/task.2/aws_services/s3_weather_bucket.tf new file mode 100644 index 0000000000000000000000000000000000000000..0acafcddc3e8719bb41c1e5374a132ebf8466b22 --- /dev/null +++ b/p.kirilin/task.2/aws_services/s3_weather_bucket.tf @@ -0,0 +1,17 @@ +resource "aws_s3_bucket" "s3_weather_bucket" { + bucket = var.s3_weather_bucket + acl = "private" + force_destroy = true +} + +resource "aws_s3_bucket_notification" "weather_bucket_notification" { + bucket = aws_s3_bucket.s3_weather_bucket.id + + lambda_function { + lambda_function_arn = aws_lambda_function.s3_lambda.arn + events = ["s3:ObjectCreated:*"] + filter_prefix = "AUTHORIZED/" + } + + depends_on = [aws_lambda_permission.allow_bucket] +} \ No newline at end of file diff --git a/p.kirilin/task.2/aws_services/sns.tf b/p.kirilin/task.2/aws_services/sns.tf new file mode 100644 index 0000000000000000000000000000000000000000..ec45b604e6ff607bfd4e9f65a9417dc5782a28df --- /dev/null +++ b/p.kirilin/task.2/aws_services/sns.tf @@ -0,0 +1,3 @@ +resource "aws_sns_topic" "asg-weather-topic" { + name = "s3rius-asg-weather-topic" +} diff --git a/p.kirilin/task.2/aws_services/sqs.tf b/p.kirilin/task.2/aws_services/sqs.tf new file mode 100644 index 0000000000000000000000000000000000000000..362100c94d9f44c20e98de962fb0bfbb39361b98 --- /dev/null +++ b/p.kirilin/task.2/aws_services/sqs.tf @@ -0,0 +1,34 @@ +resource "aws_sqs_queue" "s3rius_sqs_queue" { + name = "s3rius-sqs-autoscale" + message_retention_seconds = 1200 +} + +resource "aws_sqs_queue_policy" "sqs_publish_policy" { + queue_url = aws_sqs_queue.s3rius_sqs_queue.id + policy = data.aws_iam_policy_document.sqs_send_message_policy_doc.json +} + +data "aws_iam_policy_document" "sqs_send_message_policy_doc" { + statement { + effect = "Allow" + actions = [ + "sqs:SendMessage" + ] + principals { + type = "AWS" + identifiers = [ + "*" + ] + } + resources = [ + aws_sqs_queue.s3rius_sqs_queue.arn + ] + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [ + aws_lambda_function.s3_lambda.arn + ] + } + } +} \ No newline at end of file diff --git a/p.kirilin/task.2/aws_services/utils/asg_user_data.sh b/p.kirilin/task.2/aws_services/utils/asg_user_data.sh new file mode 100644 index 0000000000000000000000000000000000000000..6cd78c55b988a878db75bbbfd77308ffdc534995 --- /dev/null +++ b/p.kirilin/task.2/aws_services/utils/asg_user_data.sh @@ -0,0 +1,19 @@ +#!/bin/bash +LOG_FILE="/var/log/sqs_listener_logs.log" + +function main() { + aws s3 cp "s3://${meta_bucket}/${application_key}" "application.zip" + export SQS_QUEUE_ID="${sqs_queue_id}" + export AWS_REGION="${aws_region}" + export S3_BUCKET="${s3_bucket}" + export SNS_TOPIC="${sns_topic}" + export LOG_LEVEL="INFO" + export LOG_FILE="$LOG_FILE" + unzip application.zip + chmod 777 ./bootstrap + ./bootstrap +} + +exec > >(tee "$LOG_FILE" | logger -t user-data -s 2>/dev/console) 2>&1 + echo "################# INSTANCE STARTUP #################" + main \ No newline at end of file diff --git a/p.kirilin/task.2/aws_services/variables.tf b/p.kirilin/task.2/aws_services/variables.tf new file mode 100644 index 0000000000000000000000000000000000000000..daa1b221466566a6307a60ae1ac34cd425b9bff1 --- /dev/null +++ b/p.kirilin/task.2/aws_services/variables.tf @@ -0,0 +1,52 @@ +variable "s3_weather_bucket" { + type = string +} + +variable "s3_lambda_zip" { + type = string +} + +variable "s3_lambda_name" { + type = string +} + +variable "ec2_ssh_key_file" { + type = string +} + +variable "elb_port" { + type = number + default = 8100 +} + +variable "ec2_port" { + type = number + default = 80 +} + +variable "ec2_application_s3_key" { + type = string +} + +variable "ec2_cloudwatch_conf_key" { + type = string + default = "ec2_log_config.conf" +} + +variable "ec2_application_bin" { + type = string +} + +variable "aws_region" { + type = string +} + +variable "lambda_timeout" { + type = number + default = 10 +} + +variable "lambda_memory_size" { + type = number + default = 128 +} \ No newline at end of file diff --git a/p.kirilin/task.2/build.sh b/p.kirilin/task.2/build.sh new file mode 100644 index 0000000000000000000000000000000000000000..a1f56dfd20b9c3b9faca59a9fe51d61dee2d459b --- /dev/null +++ b/p.kirilin/task.2/build.sh @@ -0,0 +1,73 @@ +#!/bin/sh +ERR='\033[0;31m' +WARNING='\033[0;33m' +NC='\033[0m' # No Color + +key_name="ec2_key.pub" +applications=( "s3_lambda_function" "ec2_sqs_listener" ) + + +function build_application(){ + for application in "${applications[@]}"; do + application_path="./applications/$application/" + # Going in application directory + pushd "$application_path" || { printf "${ERR}Unknown application path:${NC}${application_path}\n"; exit 1; } + # Building application function with make + make || { printf "${ERR}Can't verify application package${NC}\n"; popd; exit 1; } + make install zip_name="$application" || { printf "${ERR}Can't pack application zip${NC}\n"; popd; exit 1; } + # Going back to start location + popd || exit 1 + mv "$application_path/$application.zip" . + # Exporting variable for terraform + eval "export TF_VAR_${application}=${application}.zip" + done +} + +# Remove application .zip package +function rm_applications() { + # Iterate over input parameters + for application in "${applications[@]}";do + # Check if application exists + if [[ -f "${application}.zip" ]];then + # Print message and delete + printf "${WARNING}Removing application zip_file: '${application}'${NC}\n" + rm "${application}.zip" + else + # Lambda was not found + printf "${ERR}Application ${application}.zip was not found${NC}\n" + fi + done +} + +function cleanup() { + rm_applications + printf "${WARNING}Removing ec2 public key${NC}\n" + rm -v "./${key_name}" +} + +function copy_ssh_key() { + cp ~/.ssh/id_rsa.pub "./${key_name}" + printf "${WARNING}ssh public key copied in build directory${NC}\n" + export TF_VAR_ec2_ssh_key_file="./${key_name}" +} + +function main(){ + copy_ssh_key + # Build and pack in zip firehose application + build_application + # Format files. + terraform fmt -recursive + # Install all modules, plugin and other stuff + terraform init + # Create plan with some variables injected + terraform apply +} + +# Export s3 bucket name to be same as in second task. +if [[ -n "$1" ]]; then + export TF_VAR_s3_bucket_name=$1 +fi + +# Remove packaged applications on exit +trap cleanup EXIT TERM +main diff --git a/p.kirilin/task.2/instance.tf b/p.kirilin/task.2/instance.tf new file mode 100644 index 0000000000000000000000000000000000000000..ddbc864cff0011f58b52a561818d6e3d78338afa --- /dev/null +++ b/p.kirilin/task.2/instance.tf @@ -0,0 +1,16 @@ +provider "aws" { + version = "~> 2.0" + region = var.region + profile = "main_profile" +} + +module "services" { + source = "./aws_services" + s3_weather_bucket = var.s3_weather_bucket + s3_lambda_zip = var.s3_lambda_function + s3_lambda_name = "s3rius_s3_lambda" + ec2_ssh_key_file = var.ec2_ssh_key_file + aws_region = var.region + ec2_application_bin = var.ec2_sqs_listener + ec2_application_s3_key = var.ec2_sqs_listener +} \ No newline at end of file diff --git a/p.kirilin/task.2/variables.tf b/p.kirilin/task.2/variables.tf new file mode 100644 index 0000000000000000000000000000000000000000..be044baa248da07e2fec5de1a75a9712602b4403 --- /dev/null +++ b/p.kirilin/task.2/variables.tf @@ -0,0 +1,29 @@ +variable "s3_weather_bucket" { + type = string + default = "s3rius-weather-bucket" +} + +variable "s3_lambda_function" { + type = string + default = "s3_lambda_function.zip" +} + +variable "sqs_listener_application" { + type = string + default = "sqs_listener_application" +} + +variable "ec2_ssh_key_file" { + type = string + default = "id_rsa.pub" +} + +variable "ec2_sqs_listener" { + type = string + default = "ec2_sqs_listener.zip" +} + +variable "region" { + type = string + default = "eu-central-1" +} \ No newline at end of file