AWS Open Source Blog

Flowpipe: A Cloud Scripting Engine for DevOps Workflows

Flowpipe is an open source tool built for DevOps teams. You can express pipelines, steps, and triggers in HCL — the familiar DevOps language – to orchestrate tasks that query AWS (and other services you regularly use). Compose pipelines using mods, and mix in SQL, AWS Lambda-compatible functions, or containers as needed. Develop, test, and run it all on your local machine — no deployment surprises or long debug cycles. Then schedule the pipelines or respond to events in real-time. Use it to:

  • Orchestrate your cloud. Build simple steps into complex workflows. Run and test locally. Compose solutions across clouds using open source mods.
  • Connect people and tools. Connect your cloud data to people and systems using email, chat & APIs. Workflow steps can even run containers, custom functions, and more.
  • Respond to events. Run workflows manually or on a schedule. Trigger pipelines from webhooks or changes in data.
  • Use code, not clicks. Build and deploy DevOps workflows like infrastructure. Code in HCL and deploy from version control.

Here’s a Flowpipe command to deactivate expired AWS Identity and Access Management (IAM) access keys, and notify a Slack channel with messages like “The access key KEY_ID for user USER_NAME has been deactivated.”

flowpipe pipeline run pipeline.deactivate_expired_aws_iam_access_keys \ 
  --arg slack_channel=iam_hygiene

Let’s break this down.

flowpipe

The binary that you run on your local machine, or in a cloud VM or CI/CD pipeline

flowpipe pipeline

The subcommand to list, view, and run pipelines

flowpipe pipeline run pipeline.deactivate_expired_aws_iam_access_keys

Invocation of a pipeline that checks for expired keys, deactivates them, and sends a Slack notification

--arg slack_channel=iam_hygiene

The channel to notify.

If you want to try this yourself, here’s the complete recipe.

  1. Download and install Flowpipe.
  2. Clone the samples repo

git clone https://github.com/turbot/flowpipe-samples

  1. Visit the subdirectory for this sample:

cd flowpipe-samples/public_cloud/deactivate_expired_aws_iam_access_keys

  1. Install dependencies

flowpipe mod install

In this case, the sample relies on two library mods: AWS and Slack.

flowpipe mod list

deactivate_expired_aws_iam_access_keys
├── github.com/turbot/flowpipe-mod-aws@v0.1.0
└── github.com/turbot/flowpipe-mod-slack@v0.1.0
  1. Set up credentials
export AWS_PROFILE={YOUR_PROFILE_NAME}
export SLACK_TOKEN={YOUR_SLACK_TOKEN}

Alternatively you could use Flowpipe credentials to manage these secrets.

  1. Run the command
flowpipe pipeline run pipeline.deactivate_expired_aws_iam_access_keys \ 
   --arg slack_channel=iam_hygiene
   —-arg aws_expire_after_days=60

This example adds the argument aws_expire_after_days which defaults to 90 days.

Show me the code!

Like all Flowpipe samples and libraries, this sample is available on GitHub under an Apache 2.0 license. Because it relies on the AWS and Slack libraries, the sample’s code is concise.

pipeline “deactivate_expired_aws_iam_access_keys” { title = “Deactivate expired AWS IAM access keys” description = “Deactivates expired AWS IAM access keys and notifies via Slack channel.”

pipeline "deactivate_expired_aws_iam_access_keys" {
  title       = "Deactivate expired AWS IAM access keys"
  description = "Deactivates expired AWS IAM access keys and notifies via Slack channel."

  param "slack_channel" {
    type        = string
    description = "Channel, private group, or IM channel to send message to."
  }

  param "aws_expire_after_days" {
    type        = number
    description = "Number of days after which the access key should be deactivated."
    default     = 90
  }

  step "pipeline" "list_iam_users" {
    pipeline = aws.pipeline.list_iam_users
  }

  step "pipeline" "list_iam_access_keys" {
    for_each = { for user in step.pipeline.list_iam_users.output.users : user.UserName => user.UserName }
    pipeline = aws.pipeline.list_iam_access_keys
    args = {
      cred      = param.aws_cred
      user_name = each.value
    }
  }

  step "pipeline" "update_iam_access_key" {
    for_each = flatten([for accessKey in step.pipeline.list_iam_access_keys : accessKey.output.access_keys])
    # Run only if the access key is active and older than specified number of days.
    if       = each.value.Status == "Active" && timecmp(each.value.CreateDate, timeadd(timestamp(), "-${param.aws_expire_after_days * 24}h")) < 0
    pipeline = aws.pipeline.update_iam_access_key
    args = {
      cred          = param.aws_cred
      user_name     = each.value.UserName
      access_key_id = each.value.AccessKeyId
      status        = "Inactive"
    }
  }

  step "pipeline" "post_message" {
    for_each = flatten([for accessKey in step.pipeline.list_iam_access_keys : accessKey.output.access_keys])
    # Run only if the access key is active and older than specified number of days.
    if = each.value.Status == "Active" && timecmp(each.value.CreateDate, timeadd(timestamp(), "-${param.aws_expire_after_days * 24}h")) < 0

    pipeline = slack.pipeline.post_message
    args = {
      cred    = param.slack_cred
      channel = param.slack_channel
      text    = "The access key ${each.value.AccessKeyId} for user ${each.value.UserName} has been deactivated."
    }
  }
}

Like all pipelines, this one comprises a set of steps. In this case there are two steps: one to inactivate expired access keys, and one to notify Slack. Both steps are of type pipeline – that is, steps that invoke other pipelines. But a pipeline can invoke any or all of these step types.

Open source Flowpipe step types table

For example, the helper pipeline aws.pipeline.list_iam_access_keys uses a container step to invoke the AWS CLI.

step "container" "list_iam_access_keys" {
    image = "public.ecr.aws/aws-cli/aws-cli"

    cmd = concat(
      ["iam", "list-access-keys"],
      param.user_name != null ? ["--user-name", "${param.user_name}"] : []
    )

    env = credential.aws[param.cred].env
  }

With Flowpipe there’s always more than one way to do something, though. For example, Flowpipe does not require Steampipe, but will happily use it. So if you’ve installed Steampipe along with the AWS plugin, Flowpipe has access to the 460+ tables provided by the plugin, and you could alternatively do this.

step "query" "list_iam_access_keys" {
    connection_string = "postgres://steampipe@localhost:9193/steampipe"
    sql = <<EOQ
      select
        user_name,
        create_date,
        status
      from
        aws_iam_access_key
    EOQ
  }

Flowpipe HCL

Terraform supports a declarative approach to defining infrastructure. Flowpipe, similarly, enables declarative definition of workflow. DevOps pros will be instantly familiar with the basics of Flowpipe HCL: the core ingredients are blocks, arguments, and expressions; there’s no intrinsic ordering of blocks but you can impose an order when block B refers to block A; steps are packaged as modules that you can reuse and combine.

The same built-in expressions, meta-arguments, and functions are available. Examples shown above include the for expression, the for_each meta-argument (and its associated each object), and the functions flatten, timecmp, and timeadd. In addition to the standard Terraform functions, Flowpipe defines a few of its own including several related to error handling.

There are also a number of built-in step arguments used for flow control, error handling, and documentation.

common step arguments in Flowpipe

With the addition of these affordances, Flowpipe HCL behaves more like a full fledged programming language than Terraform HCL which is primarily a resource configuration language.

Composable Mods and the Flowpipe Hub

Flowpipe mods are open source composable pipelines so you can remix and reuse your code — or build on the great work of the community. The Flowpipe Hub is a searchable directory of mods to discover and use. The source code for mods is available on GitHub if you’d like to learn or contribute.

Flowpipe library mods make it easy to work with common services including AWS, GitHub, Jira, Slack, Teams, Zendesk … and many more!

Flowpipe sample mods are ready-to-run samples that demonstrate patterns and use of various library mods.

It’s easy to create your own mod that runs standalone, perhaps calling dependent mods as shown above. And any mod you create can itself be called by another. So, for example, the deactivate_expired_aws_iam_access_keys mod could be one step in a higher-level mod that enforces a set of access-related policies.

Schedules, Events and Triggers

DevOps is filled with routine work, which Flowpipe is happy to do on a schedule. Just set up a schedule trigger to run a pipeline at regular intervals:

trigger "schedule" "daily_3pm" {
 schedule = "* 15 * * *"
 pipeline = pipeline.daily_task
}

Events are critical to the world of AWS scripting — we need to respond immediately to code pushes, infrastructure change, Slack messages, etc. So Flowpipe has an http trigger to handle incoming webhooks and run a pipeline:

trigger "http" "my_webhook" {
 pipeline = pipeline.my_pipeline
 args     = {
   event = self.request_body
 }
}

The sweet spot for AWS automation

To get started with Flowpipe: download the tool, follow the tutorial, peruse the library mods, and check out the samples. Then let us know how it goes!

Tom Callaway

Tom Callaway

Tom is a Principal Open Source Evangelist for AWS. He has been a part of the open source community since 1997, when he skipped his last day of junior high to go to Linux Expo. During college, he worked for a high-availability startup to cover tuition, and when they crashed along with the majority of the IT sector, he dropped out of college and went to work for Red Hat full-time. He worked for Red Hat for almost twenty years, in Support, Sales Engineering, Release Engineering, Engineering Management, University Outreach (CTO’s office), and Employment Brand. He’s an active contributor to Fedora and helped to write the Fedora Packaging and Legal Guidelines which are still in use today. He’s spoken at a number of conferences and events including SxSW, OSCON, Open Source Summit, and Red Hat Summit. He has one patent on a crazy idea that never got implemented in the real world, and is co-author of Raspberry Pi Hacks (2013, O’Reilly). When he’s not working, he finds enjoyment in 3D printing, pinball, games (board & video), geocaching, craft beer, B-movies, science fiction, trivia, traveling, and his wife and two boys. He lives in Cary, NC. Tom is also known as “spot” by many people in the open source universe, he’s gone by that nickname since the 1st grade, and he happily answers to it. Follow him on Twitter @spotfoss.

Bob Tordella

Bob Tordella

Bob Tordella is the CRO of Turbot (https://turbot.com). He is recognized as a cloud governance leader who has enabled the world's largest enterprise organizations to secure and optimize their public cloud environments. Bob is currently improving the way teams manage their cloud security and operations using Turbot Guardrails and Turbot Pipes. He is also an advocate for Turbot's open source projects which help engineers to query, report and act with their cloud data; Steampipe.io, Powerpipe.io and Flowpipe.io.

Jon Udell

Jon Udell

Jon Udell is the community lead for Turbot's open source products: Steampipe, Powerpipe, and Flowpipe. He's known as a developer and tech journalist who explores and explains many kinds of software, and many ways of developing it. He has worked for Lotus Development, BYTE, InfoWorld, O'Reilly Media, and Microsoft.