image/svg+xml Checkov home
  • Docs
    • Quick start
    • Overview
    • Integrations
  • Download
  • Docs
    • Quick start
    • Overview
    • Integrations

Checkov Documentation

  • 1.Welcome
    • What is Checkov?
    • Terms and Concepts
    • Quick Start
    • Feature Descriptions
    • Migration
  • 2.Basics
    • Installing Checkov
    • CLI Command Reference
    • Suppressing and Skipping Policies
    • Hard and soft fail
    • Scanning Credentials and Secrets
    • Reviewing Scan Results
    • Visualizing Checkov Output
    • Handling Variables
  • 3.Custom Policies
    • Custom Policies Overview
    • Create Custom Policy - Python - Attribute Check
      • Writing a Python custom Checkov policy
        • Selecting the best base check class to extend
        • Run a new scan
      • Example
      • Signing custom checks (optional)
        • Constraints
        • Generating a key pair
        • Signing a single check file
        • Running Checkov with verification enabled
        • Rotating keys
    • YAML Custom Policies
    • Custom YAML Policies Examples
    • Sharing Custom Policies
  • 4.Integrations
    • Jenkins
    • Bitbucket Cloud Pipelines
    • GitHub Actions
    • GitLab CI
    • Kubernetes
    • Pre-Commit Hooks
    • Docker
  • 5.Policy Index
    • all resource scans
    • ansible resource scans
    • argo_workflows resource scans
    • arm resource scans
    • azure_pipelines resource scans
    • bicep resource scans
    • bitbucket_configuration resource scans
    • bitbucket_pipelines resource scans
    • circleci_pipelines resource scans
    • cloudformation resource scans
    • dockerfile resource scans
    • github_actions resource scans
    • github_configuration resource scans
    • gitlab_ci resource scans
    • gitlab_configuration resource scans
    • kubernetes resource scans
    • openapi resource scans
    • secrets resource scans
    • serverless resource scans
    • terraform resource scans
  • 6.Contribution
    • Checkov Runner Contribution Guide
    • Implementing CI Metadata extractor
    • Implementing ImageReferencer
    • Contribution Overview
    • Contribute Python-Based Policies
    • Contribute YAML-based Policies
    • Contribute New Terraform Provider
    • Contribute New Argo Workflows configuration policy
    • Contribute New Azure Pipelines configuration policy
    • Contribute New Bitbucket configuration policy
    • Contribute New GitHub configuration policy
    • Contribute New Gitlab configuration policy
  • 7.Scan Examples
    • Terraform Plan Scanning
    • Terraform Scanning
    • Helm
    • Kustomize
    • AWS SAM configuration scanning
    • Ansible configuration scanning
    • Argo Workflows configuration scanning
    • Azure ARM templates configuration scanning
    • Azure Pipelines configuration scanning
    • Azure Bicep configuration scanning
    • Bitbucket configuration scanning
    • AWS CDK configuration scanning
    • Cloudformation configuration scanning
    • Dockerfile configuration scanning
    • GitHub configuration scanning
    • Gitlab configuration scanning
    • Kubernetes configuration scanning
    • OpenAPI configuration scanning
    • SCA scanning
    • Serverless framework configuration scanning
  • 8.Outputs
    • CSV
    • CycloneDX BOM
    • GitLab SAST
    • JUnit XML
    • SARIF
  • Docs
  • 3.custom policies
  • Python Custom Policies
Edit on GitHub

Create Custom Policy - Python - Attribute Check

Custom Policies created in code (in Python) support checking the state of a resource’s attributes. A Python-based Custom Policy for Checkov consists of sections for Metadata and Policy Definition.

Read also how to create custom YAML Policies for attribute and composite scanning.

Writing a Python custom Checkov policy

Specify a name, ID, relevant resources and categories.

Parameter Description Example/Comments
name A new policy’s unique purpose. It should ideally specify the positive desired outcome of the policy.  
id A mandatory unique identifier of a policy. Native policies written by Prisma Cloud contributors will follow the following convention: CKV_providerType_serialNumber CKV_AWS_9 , CKV_GCP_12
supported_resources Infrastructure objects, as described in the scanned IaC’s language. This usually contains one specific resource block. If you support multiple resources, you can use * to match any type of entity in that specific domain. * use depends on which check base class you extend; see note below table. ?ws_* will match anything where the second character is a 'w', the third is a 's' and the fourth is a '_'.
categories Categorization of a scan. Usually used to produce compliance reports, pipeline analytics and infrastructure health metrics, etc.  
guideline (Optional) Add extra info to help the user to solve the issue. This is not needed

Note for Supported Resources Parameter: If you extend checkov.terraform.checks.resource.base_resource_check.BaseResourceCheck, the check is registered for all Terraform resources.

The following example produces a policy that ensures that new RDS services spun-up are encrypted at rest, given a scanned Terraform configuration (CKV_AWS_16).

  1. Create a new file in the AWS check directory checkov/terraform/checks/resource/aws/RDSEncryption.py.
  2. Import the following:
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
  1. Define the meta entities for this check as described in the table above.
class RDSEncryption(BaseResourceCheck):
    def __init__(self) -> None:
        name = "Ensure all data stored in the RDS is securely encrypted at rest"
        id = "CKV_AWS_16"
        supported_resources = ("aws_db_instance",)
        categories = (CheckCategories.ENCRYPTION,)
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)
  1. Define a simple check of the aws_db_instance resource block to determine if aws_db_instance is disabled. If it is disabled, that needs to cause a CheckResult.FAILED to occur.
def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
    """
        Looks for encryption configuration at aws_db_instance:
        https://www.terraform.io/docs/providers/aws/d/db_instance.html
    :param conf: aws_db_instance configuration
    :return: <CheckResult>
    """
    if 'storage_encrypted' in conf.keys():
        key = conf['storage_encrypted'][0]
        if key:
            return CheckResult.PASSED
    return CheckResult.FAILED

Note:

The conf parameter is dependent on the resource type, which was chosen via the supported_resources class instance attribute. For example, for the aws_db_instance resource, we get the following value:

conf = {
    "__end_line__": 11,  # internal field
    "__start_line__": 3,  # internal field
    "allocated_storage": [5],
    "enabled_cloudwatch_logs_exports": [["postgresql", "upgrade"]],
    "engine": ["postgres"],
    "instance_class": ["db.t3.small"],
    "password": ["postgres"],
    "username": ["postgres"],
    "__address__": "aws_db_instance.postgres",  # internal field
}

which is the internal representation of following Terraform resource block

resource "aws_db_instance" "postgres" {
  allocated_storage = 5
  engine            = "postgres"
  instance_class    = "db.t3.small"
  password          = "postgres"
  username          = "postgres"

  enabled_cloudwatch_logs_exports = ["postgresql", "upgrade"]
}

If more than one resource type was set for supported_resources, then it is possible to retrieve the info via the class instance attribute self.entity_type.

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
    if self.entity_type == "aws_db_instance":
        ...
    elif self.entity_type == "aws_rds_cluster_instance":
        ...
  1. Implement get_evaluated_keys to allow the check results report show the specified key.
def get_evaluated_keys(self) -> List[str]:
    return ['storage_encrypted/[0]']

If the evaluated keys are determined dynamically, you can set the evaluated key when scanning the resource configuration:

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
    """
        Looks for encryption configuration at aws_db_instance:
        https://www.terraform.io/docs/providers/aws/d/db_instance.html
    :param conf: aws_db_instance configuration
    :return: <CheckResult>
    """
    if 'storage_encrypted' in conf.keys():
        key = conf['storage_encrypted'][0]
        if key:
            # The following line sets the evaluated keys
            self.evaluated_keys = ['storage_encrypted/[0]']
            return CheckResult.PASSED
    return CheckResult.FAILED
  1. You can also add details to be printed on the execution report:
    def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
     """
         Looks for encryption configuration at aws_db_instance:
         https://www.terraform.io/docs/providers/aws/d/db_instance.html
     :param conf: aws_db_instance configuration
     :return: <CheckResult>
     """
     if 'storage_encrypted' in conf.keys():
         key = conf['storage_encrypted'][0]
         if key:
             # The following line sets the evaluated keys
             self.evaluated_keys = ['storage_encrypted/[0]']
             return CheckResult.PASSED
            
     self.details.append("'storage_encrypted' was not found on the resource configuration")
        
     return CheckResult.FAILED
    

Produces the following CLI report: details-cli-screenshot

  1. Conclude the policy name and operationalize it with the statement:
check = RDSEncryption()

Selecting the best base check class to extend

Terraform and CloudFormation have two base classes extending BaseResourceCheck:

  1. BaseResourceValueCheck: This check will pass only if the inspected_key is within the expected_values. If get_expected_value is not implemented, the default value is [True].
class RDSPubliclyAccessible(BaseResourceValueCheck):
    def __init__(self) -> None:
        name = "Ensure all data stored in RDS is not publicly accessible"
        id = "CKV_AWS_17"
        supported_resources = ("AWS::RDS::DBInstance",)
        categories = (CheckCategories.NETWORKING,)
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources,
                         missing_block_result=CheckResult.PASSED)
    
    def get_inspected_key(self) -> str:
        return 'Properties/PubliclyAccessible'    
        
    def get_expected_values(self) -> list[Any]:
        return [False]

Another option is to use ANY_VALUE:

def get_expected_values(self) -> list[Any]:
    return [ANY_VALUE]
  1. BaseResourceNegativeValueCheck: This check will pass only if the inspected_key is NOT within the forbidden_values.
class NeptuneClusterInstancePublic(BaseResourceNegativeValueCheck):
    def __init__(self) -> None:
        name = "Ensure Neptune Cluster instance is not publicly available"
        id = "CKV_AWS_102"
        supported_resources = ['aws_neptune_cluster_instance']
        categories = [CheckCategories.GENERAL_SECURITY]
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

    def get_inspected_key(self) -> str:
        return 'publicly_accessible/[0]'

    def get_forbidden_values(self) -> List[Any]:
        return [True]

Run a new scan

To run a scan with the new policy, use the checkov command.

checkov -d /user/tf

#Working with Custom Policies

Checkov is delivered with a set of built-in policies that check for compliance and security best practices at its core. In addition, Checkov enables you to load additional checks, that give the user the ability to author and execute custom policies.

Example

This example uses the following directory structure:

├── main.tf
├── variables.tf
└── outputs.tf

The example assumes a unique need to enforce bucket ACL policies only when the tag Scope=PCI is present. That being the case, the following bucket definition must trigger a failed check result:

# Snippet from  main.tf
resource "aws_s3_bucket" "credit_cards_bucket" {
  region        = var.region
  bucket        = local.bucket_name
  acl           = "public-read"
  force_destroy = true

  tags = {
    Scope = "PCI",
    
  }
}

To trigger the failed check result, you need to add a new check to ensure PCI related S3 buckets will stay private.

  1. Create a new python folder named my_extra_checks containing the new check:
├── main.tf
├── variables.tf
└── outputs.tf
└── my_extra_checks
    └── __init__.py
    └── S3PCIPrivateACL.py

a. The first time you setup the custom checks folder, you need to also create a file named __init__.py.

from os.path import dirname, basename, isfile, join
import glob
modules = glob.glob(join(dirname(__file__), "*.py"))
__all__ = [ basename(f)[:-3] for f in modules if isfile(f) and not f.endswith('__init__.py')]

b. Complete the matching logic in S3PCIPrivateACL.py:

from __future__ import annotations

from typing import Any

from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories


class S3PCIPrivateACL(BaseResourceCheck):
    def __init__(self) -> None:
        name = "Ensure PCI Scope buckets has private ACL (enable public ACL for non-pci buckets)"
        id = "CKV_AWS_999"
        supported_resources = ("aws_s3_bucket",)
        # CheckCategories are defined in models/enums.py
        categories = (CheckCategories.BACKUP_AND_RECOVERY,)
        guideline = "Follow the link to get more info https://docs.prismacloud.io/en/enterprise-edition/policy-reference"
        super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources, guideline=guideline)

    def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
        """
            Looks for ACL configuration at aws_s3_bucket and Tag values:
            https://www.terraform.io/docs/providers/aws/r/s3_bucket.html
        :param conf: aws_s3_bucket configuration
        :return: <CheckResult>
        """
        tags = conf.get("tags")
        if tags and isinstance(tags, list):
            tags = tags[0]
            if tags.get("Scope") == "PCI":
                acl_block = conf['acl']
                if acl_block in [["public-read"], ["public-read-write"], ["website"]]:
                    return CheckResult.FAILED
        return CheckResult.PASSED


check = S3PCIPrivateACL()
  1. With the new custom check in place, run Checkov:
# install from pypi using pip
pip install checkov


# select an input folder that contains your terraform files and enable loading of extra checks
checkov -d . --external-checks-dir my_extra_checks

Verify the results:

Check: "Ensure PCI Scope buckets has private ACL (enable public ACL for non-pci buckets)"
	FAILED for resource: aws_s3_bucket.credit_cards_bucket
	File: /main.tf:80-90
	Guide: Follow the link to get more info https://docs.prismacloud.io/en/enterprise-edition/policy-reference

		80 | resource "aws_s3_bucket" "credit_cards_bucket" {
		81 |   region        = var.region
		82 |   bucket        = local.bucket_name
		83 |   acl           = "public-read"
		84 |   force_destroy = true
		85 |
		86 |   tags = {
		87 |     Scope = "PCI",
		88 |
		89 |   }
		90 | }

Attention: Policies cannot share the same file name. If two policies with the same file name exist, only the first one will be loaded.

Signing custom checks (optional)

By default, Checkov loads every .py file under --external-checks-dir (or --external-checks-git) and executes it. In environments where the custom-checks tree must be tamper-evident, Checkov can verify a cryptographic signature on each file before loading it.

Enable verification by passing one or more PEM-encoded ECDSA P-256 public keys via --external-checks-public-key (or the CKV_EXTERNAL_CHECKS_PUBLIC_KEY environment variable). When at least one key is configured, every .py file under the custom-checks directory must end with a # checkov-digest: <hex> trailer line signed by one of the configured keys. Files without a valid trailer cause Checkov to exit with code 2 before any scan runs.

When --external-checks-public-key is not set, no verification is performed and behaviour is unchanged.

Constraints

  • Trailer signing covers .py files only. Binary loadable file types (.pyc, .so, .pyd, .pyi) cannot be trailer-signed and are rejected by the verifier. Distribute compiled extensions via pip install and import them by package name; do not place them in --external-checks-dir.
  • Sign after all code-formatting steps in your CI (after black, ruff format, end-of-file-fixer, and any “trim trailing whitespace” editor hooks) and before the commit. Formatters can invalidate the trailer if they run after signing.
  • The trailer is the very last line of the file, terminated by exactly one \n. No CRLF, no BOM, no trailing whitespace.
  • Do not sign the same file twice. Each signing run must start from the unsigned file. If your signing pipeline can be re-triggered (e.g. a pre-commit hook), make sure it detects an existing trailer and either skips the file or strips the trailer before re-signing. A file that ends in two trailer lines is rejected with a file appears to be signed twice error.
  • Verification covers --external-checks-dir and --external-checks-git only. Policies fetched from the Bridgecrew/Prisma Cloud platform (visible inside Checkov as sast_custom_policies) are authenticated separately by the platform integration’s TLS-authenticated fetch and are not in scope for trailer verification.
  • Same-stem checks across subdirectories are not supported in v1. Checkov imports each external check by its bare filename stem, so checks/aws/helper.py and checks/azure/helper.py collide on the name helper. Keep helper filenames unique across a single --external-checks-dir tree.

Generating a key pair

openssl ecparam -name prime256v1 -genkey -noout -out priv.pem
openssl ec -in priv.pem -pubout -out pub.pem

Keep priv.pem in a secure store (HSM, KMS, or your CI secret store). Distribute pub.pem to anyone running Checkov.

Signing a single check file

Given an unsigned aws_check.py:

# 1. Refuse to re-sign — bail out if the file already ends in a trailer line.
if tail -n 1 aws_check.py | grep -q '^# checkov-digest: '; then
  echo "aws_check.py already has a trailer; refusing to double-sign." >&2
  exit 1
fi

# 2. Produce a DER ECDSA-SHA256 signature over the unsigned file bytes,
#    hex-encode it on one line, and append the trailer.
HEX=$(openssl dgst -sha256 -sign priv.pem aws_check.py \
        | od -An -tx1 \
        | tr -d ' \n')
printf '# checkov-digest: %s\n' "$HEX" >> aws_check.py

The od -An -tx1 | tr -d ' \n' pipeline produces a single-line lowercase-hex string and is portable across both GNU and BSD/macOS toolchains. (xxd -p is also lowercase but its line-wrapping behaviour varies between distributions, and the verifier rejects multi-line trailers.)

After signing, the file’s last line will look like:

# checkov-digest: 30450220...c3a4

Repeat for every .py file in the custom-checks tree, including each __init__.py (an empty __init__.py becomes a file whose entire contents is just the trailer line).

Running Checkov with verification enabled

checkov -d . \
  --external-checks-dir my_extra_checks \
  --external-checks-public-key pub.pem

On success, Checkov loads the verified checks and proceeds normally. On any failure (missing trailer, bad signature, unknown key, presence of a .pyc/.so/.pyd/.pyi file), Checkov prints the offending paths to stderr and exits with code 2 without running the scan.

Rotating keys

Pass --external-checks-public-key multiple times (or list multiple paths in the env var). Each file is accepted if it verifies under any one of the configured keys, which lets you publish a new key alongside the old one and re-sign on your own schedule.

checkov -d . --external-checks-dir my_extra_checks \
  --external-checks-public-key pub_old.pem \
  --external-checks-public-key pub_new.pem

Powered By

  • Slack Community
  • Prisma Cloud
  • Terms of use
  • GitHub
  • Docs
  • Privacy policy