Bridgecrew.io
  • About Bridgecrew by Prisma Cloud
Checkov home
  • Docs
    • Quick start
    • Overview
    • Integrations
  • Download
  • Try Bridgecrew
  • Docs
    • Quick start
    • Overview
    • Integrations

Checkov Documentation

  • 1.Welcome
    • What is Checkov?
    • Terms and Concepts
    • Quick Start
    • Feature Descriptions
  • 2.Basics
    • Installing Checkov
    • CLI Command Reference
    • Suppressing and Skipping Policies
    • Hard and soft fail
    • Scanning Credentials and Secrets
    • Reviewing Scan Results
    • Visualizing Checkov Output
    • Handling Variables
  • 3.Custom Policies
    • Custom Policies Overview
    • Create Custom Policy - Python - Attribute Check
      • Writing a Python custom Checkov policy
        • Selecting the best base check class to extend
        • Run a new scan
      • Example
    • YAML Custom Policies
    • Custom YAML Policies Examples
    • Sharing Custom Policies
  • 4.Integrations
    • Jenkins
    • Bitbucket Cloud Pipelines
    • GitHub Actions
    • GitLab CI
    • Kubernetes
    • Pre-Commit
    • Docker
  • 5.Policy Index
    • all resource scans
    • ansible resource scans
    • argo_workflows resource scans
    • arm resource scans
    • azure_pipelines resource scans
    • bicep resource scans
    • bitbucket_configuration resource scans
    • bitbucket_pipelines resource scans
    • circleci_pipelines resource scans
    • cloudformation resource scans
    • dockerfile resource scans
    • github_actions resource scans
    • github_configuration resource scans
    • gitlab_ci resource scans
    • gitlab_configuration resource scans
    • kubernetes resource scans
    • openapi resource scans
    • secrets resource scans
    • serverless resource scans
    • terraform resource scans
  • 6.Contribution
    • Checkov Runner Contribution Guide
    • Implementing CI Metadata extractor
    • Implementing ImageReferencer
    • Contribution Overview
    • Contribute Python-Based Policies
    • Contribute YAML-based Policies
    • Contribute New Terraform Provider
    • Contribute New Argo Workflows configuration policy
    • Contribute New Azure Pipelines configuration policy
    • Contribute New Bitbucket configuration policy
    • Contribute New GitHub configuration policy
    • Contribute New Gitlab configuration policy
  • 7.Scan Examples
    • Terraform Plan Scanning
    • Terraform Scanning
    • Helm
    • Kustomize
    • AWS SAM configuration scanning
    • Ansible configuration scanning
    • Argo Workflows configuration scanning
    • Azure ARM templates configuration scanning
    • Azure Pipelines configuration scanning
    • Azure Bicep configuration scanning
    • Bitbucket configuration scanning
    • AWS CDK configuration scanning
    • Cloudformation configuration scanning
    • Dockerfile configuration scanning
    • GitHub configuration scanning
    • Gitlab configuration scanning
    • Kubernetes configuration scanning
    • OpenAPI configuration scanning
    • SCA scanning
    • Serverless framework configuration scanning
  • 8.Outputs
    • CSV
    • CycloneDX BOM
    • GitLab SAST
    • JUnit XML
    • SARIF
  • 9.Level up
    • Upgrade from Checkov to Bridgecrew
  • Docs
  • 3.custom policies
  • Python Custom Policies
Edit on GitHub

Create Custom Policy - Python - Attribute Check

Custom Policies created in code (in Python) support checking the state of a resource’s attributes. A Python-based Custom Policy for Checkov consists of sections for Metadata and Policy Definition.

Read also how to create custom YAML Policies for attribute and composite scanning.

Writing a Python custom Checkov policy

Specify a name, ID, relevant resources and categories.

Parameter Description Example/Comments
name A new policy’s unique purpose. It should ideally specify the positive desired outcome of the policy.  
id A mandatory unique identifier of a policy. Native policies written by Bridgecrew contributors will follow the following convention: CKV_providerType_serialNumber CKV_AWS_9 , CKV_GCP_12
supported_resources Infrastructure objects, as described in the scanned IaC’s language. This usually contains one specific resource block. If you support multiple resources, you can use * to match any type of entity in that specific domain. * use depends on which check base class you extend; see note below table. ?ws_* will match anything where the second character is a 'w', the third is a 's' and the fourth is a '_'.
categories Categorization of a scan. Usually used to produce compliance reports, pipeline analytics and infrastructure health metrics, etc.  
guideline (Optional) Add extra info to help the user to solve the issue. This is not needed

Note for Supported Resources Parameter: If you extend checkov.terraform.checks.resource.base_resource_check.BaseResourceCheck, the check is registered for all Terraform resources.

The following example produces a policy that ensures that new RDS services spun-up are encrypted at rest, given a scanned Terraform configuration (CKV_AWS_16).

  1. Create a new file in the AWS check directory checkov/terraform/checks/resource/aws/RDSEncryption.py.
  2. Import the following:
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
  1. Define the meta entities for this check as described in the table above.
class RDSEncryption(BaseResourceCheck):
    def __init__(self):
        name = "Ensure all data stored in the RDS is securely encrypted at rest"
        id = "CKV_AWS_16"
        supported_resources = ['aws_db_instance']
        categories = [CheckCategories.ENCRYPTION]
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)
  1. Define a simple check of the aws_db_instance resource block to determine if aws_db_instance is disabled. If it is disabled, that needs to cause a CheckResult.FAILED to occur.
def scan_resource_conf(self, conf):
    """
        Looks for encryption configuration at aws_db_instance:
        https://www.terraform.io/docs/providers/aws/d/db_instance.html
    :param conf: aws_db_instance configuration
    :return: <CheckResult>
    """
    if 'storage_encrypted' in conf.keys():
        key = conf['storage_encrypted'][0]
        if key:
            return CheckResult.PASSED
    return CheckResult.FAILED
  1. Implement get_evaluated_keys to allow the check results report show the specified key.
def get_evaluated_keys(self) -> List[str]:
    return ['storage_encrypted/[0]']

If the evaluated keys are determined dynamically, you can set the evaluated key when scanning the resource configuration:

def scan_resource_conf(self, conf):
    """
        Looks for encryption configuration at aws_db_instance:
        https://www.terraform.io/docs/providers/aws/d/db_instance.html
    :param conf: aws_db_instance configuration
    :return: <CheckResult>
    """
    if 'storage_encrypted' in conf.keys():
        key = conf['storage_encrypted'][0]
        if key:
            # The following line sets the evaluated keys
            self.evaluated_keys = ['storage_encrypted/[0]']
            return CheckResult.PASSED
    return CheckResult.FAILED
  1. You can also add details to be printed on the execution report:
    def scan_resource_conf(self, conf):
     """
         Looks for encryption configuration at aws_db_instance:
         https://www.terraform.io/docs/providers/aws/d/db_instance.html
     :param conf: aws_db_instance configuration
     :return: <CheckResult>
     """
     if 'storage_encrypted' in conf.keys():
         key = conf['storage_encrypted'][0]
         if key:
             # The following line sets the evaluated keys
             self.evaluated_keys = ['storage_encrypted/[0]']
             return CheckResult.PASSED
            
     self.details.append("'storage_encrypted' was not found on the resource configuration")
        
     return CheckResult.FAILED
    

Produces the following CLI report: details-cli-screenshot

  1. Conclude the policy name and operationalize it with the statement:
check = RDSEncryption()

Selecting the best base check class to extend

Terraform and CloudFormation have two base classes extending BaseResourceCheck:

  1. BaseResourceValueCheck: This check will pass only if the inspected_key is within the expected_values. If get_expected_value is not implemented, the default value is [True].
class RDSPubliclyAccessible(BaseResourceValueCheck):

    def __init__(self):
        name = "Ensure all data stored in RDS is not publicly accessible"
        id = "CKV_AWS_17"
        supported_resources = ['AWS::RDS::DBInstance']
        categories = [CheckCategories.NETWORKING]
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources,
                         missing_block_result=CheckResult.PASSED)
    
    def get_inspected_key(self):
        return 'Properties/PubliclyAccessible'    
        
    def get_expected_values(self):
        return [False]

Another option is to use ANY_VALUE:

def get_expected_values(self):
    return [ANY_VALUE]
  1. BaseResourceNegativeValueCheck: This check will pass only if the inspected_key is NOT within the forbidden_values.
class NeptuneClusterInstancePublic(BaseResourceNegativeValueCheck):
    def __init__(self):
        name = "Ensure Neptune Cluster instance is not publicly available"
        id = "CKV_AWS_102"
        supported_resources = ['aws_neptune_cluster_instance']
        categories = [CheckCategories.GENERAL_SECURITY]
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

    def get_inspected_key(self) -> str:
        return 'publicly_accessible/[0]'

    def get_forbidden_values(self) -> List[Any]:
        return [True]

Run a new scan

To run a scan with the new policy, use the checkov command.

checkov -d /user/tf

#Working with Custom Policies

Checkov is delivered with a set of built-in policies that check for compliance and security best practices at its core. In addition, Checkov enables you to load additional checks, that give the user the ability to author and execute custom policies.

Example

This example uses the following directory structure:

├── main.tf
├── variables.tf
└── outputs.tf

The example assumes a unique need to enforce bucket ACL policies only when the tag Scope=PCI is present. That being the case, the following bucket definition must trigger a failed check result:

# Snippet from  main.tf
resource "aws_s3_bucket" "credit_cards_bucket" {
  region        = var.region
  bucket        = local.bucket_name
  acl           = "public-read"
  force_destroy = true

  tags = {
    Scope = "PCI",
    
  }
}

To trigger the failed check result, you need to add a new check to ensure PCI related S3 buckets will stay private.

  1. Create a new python folder named my_extra_checks containing the new check:
├── main.tf
├── variables.tf
└── outputs.tf
└── my_extra_checks
    └── __init__.py
    └── S3PCIPrivateACL.py

a. The first time you setup the custom checks folder, you need to also create a file named __init__.py.

from os.path import dirname, basename, isfile, join
import glob
modules = glob.glob(join(dirname(__file__), "*.py"))
__all__ = [ basename(f)[:-3] for f in modules if isfile(f) and not f.endswith('__init__.py')]

b. Complete the matching logic in S3PCIPrivateACL.py:

from lark import Token

from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories


class S3PCIPrivateACL(BaseResourceCheck):
    def __init__(self):
        name = "Ensure PCI Scope buckets has private ACL (enable public ACL for non-pci buckets)"
        id = "CKV_AWS_999"
        supported_resources = ['aws_s3_bucket']
        # CheckCategories are defined in models/enums.py
        categories = [CheckCategories.BACKUP_AND_RECOVERY]
        guideline = "Follow the link to get more info https://docs.bridgecrew.io/docs"
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources, guideline=guideline)

    def scan_resource_conf(self, conf):
        """
            Looks for ACL configuration at aws_s3_bucket and Tag values:
            https://www.terraform.io/docs/providers/aws/r/s3_bucket.html
        :param conf: aws_s3_bucket configuration
        :return: <CheckResult>
        """
        if 'tags' in conf.keys():
            environment_tag = Token("IDENTIFIER", "Scope")
            if environment_tag in conf['tags'][0].keys():
                if conf['tags'][0][environment_tag] == "PCI":
                    if 'acl' in conf.keys():
                        acl_block = conf['acl']
                        if acl_block in [["public-read"], ["public-read-write"], ["website"]]:
                            return CheckResult.FAILED
        return CheckResult.PASSED


check = S3PCIPrivateACL()
  1. With the new custom check in place, run Checkov:
# install from pypi using pip
pip install checkov


# select an input folder that contains your terraform files and enable loading of extra checks
checkov -d . --external-checks-dir my_extra_checks

Verify the results:

Check: "Ensure PCI Scope buckets has private ACL (enable public ACL for non-pci buckets)"
	FAILED for resource: aws_s3_bucket.credit_cards_bucket
	File: /main.tf:80-90
	Guide: Follow the link to get more info https://docs.bridgecrew.io/docs

		80 | resource "aws_s3_bucket" "credit_cards_bucket" {
		81 |   region        = var.region
		82 |   bucket        = local.bucket_name
		83 |   acl           = "public-read"
		84 |   force_destroy = true
		85 |
		86 |   tags = {
		87 |     Scope = "PCI",
		88 |
		89 |   }
		90 | }

Attention: Policies cannot share the same file name. If two policies with the same file name exist, only the first one will be loaded.

Powered By

  • Slack Community
  • About Bridgecrew
  • Platform
  • Terms of use
  • GitHub
  • Docs
  • Contact Us
  • Privacy policy