Thursday, January 22, 2026

Automate the export of Amazon RDS for MySQL or Amazon Aurora MySQL audit logs to Amazon S3 with batching or close to real-time processing


Audit logging has turn into a vital part of database safety and compliance, serving to organizations observe consumer actions, monitor knowledge entry patterns, and preserve detailed information for regulatory necessities and safety investigations. Database audit logs present a complete path of actions carried out throughout the database, together with queries executed, modifications made to knowledge, and consumer authentication makes an attempt. Managing these logs is extra simple with a strong storage answer resembling Amazon Easy Storage Service (Amazon S3).

Amazon Relational Database Service (Amazon RDS) for MySQL and Amazon Aurora MySQL-Suitable Version present built-in audit logging capabilities, however prospects would possibly must export and retailer these logs for long-term retention and evaluation. Amazon S3 affords an excellent vacation spot, offering sturdiness, cost-effectiveness, and integration with numerous analytics instruments.

On this put up, we discover two approaches for exporting MySQL audit logs to Amazon S3: both utilizing batching with a local export to Amazon S3 or processing logs in actual time with Amazon Information Firehose.

Answer overview

The primary answer entails batch processing by utilizing the built-in audit log export function in Amazon RDS for MySQL or Aurora MySQL-Suitable to export logs to Amazon CloudWatch Logs. Amazon EventBridge periodically triggers an AWS Lambda perform. This answer creates a CloudWatch export activity that sends the final one days’s of audit logs to Amazon S3. The interval (in the future) is configurable primarily based in your necessities. This answer is probably the most cost-effective and sensible in case you don’t require the audit logs to be accessible in real-time inside an S3 bucket. The next diagram illustrates this workflow.

The opposite proposed answer makes use of Information Firehose to right away course of the MySQL audit logs inside CloudWatch Logs and ship them to an S3 bucket. This strategy is appropriate for enterprise use instances that require rapid export of audit logs after they’re accessible inside CloudWatch Logs. The next diagram illustrates this workflow.

Overview of solution 2

Use instances

When you’ve carried out both of those options, you’ll have your Aurora MySQL or RDS for MySQL audit logs saved securely in Amazon S3. This opens up a wealth of prospects for evaluation, monitoring, and compliance reporting. Right here’s what you are able to do together with your exported audit logs:

  • Run Amazon Athena queries: Along with your audit logs in S3, you should utilize Amazon Athena to run SQL queries immediately in opposition to your log knowledge. This lets you shortly analyze consumer actions, determine uncommon patterns, or generate compliance stories. For instance, you may question for all actions carried out by a selected consumer, or discover all failed login makes an attempt inside a sure time-frame.
  • Create Amazon Fast Sight dashboards: Utilizing Amazon Fast Sight at the side of Athena, you possibly can create visible dashboards of your audit log knowledge. This might help you see developments over time, resembling peak utilization hours, most energetic customers, or continuously accessed database objects.
  • Arrange automated alerting: By combining your S3-stored logs with AWS Lambda and Amazon SNS, you possibly can create automated alerts for particular occasions. For example, you may arrange a system to inform safety personnel if there’s an uncommon spike in failed login makes an attempt or if delicate tables are accessed outdoors of enterprise hours.
  • Carry out long-term evaluation: Along with your audit logs centralized in S3, you possibly can carry out long-term pattern evaluation. This might show you how to perceive how database utilization patterns change over time, informing capability planning and safety insurance policies.
  • Meet compliance necessities: Many regulatory frameworks require retention and evaluation of database audit logs. Along with your logs in S3, you possibly can simply reveal compliance with these necessities, working stories as wanted for auditors.

By leveraging these capabilities, you possibly can flip your audit logs from a passive safety measure into an energetic instrument for database administration, safety enhancement, and enterprise intelligence.

Evaluating options

The primary answer used EventBridge to periodically set off a Lambda perform. This perform creates a CloudWatch Log export activity that sends a batch of log knowledge to Amazon S3 at common intervals. This methodology is well-suited for eventualities the place you favor to course of logs in batches to optimize prices and assets.

The second answer makes use of Information Firehose to create a real-time audit log processing pipeline. This strategy streams logs immediately from CloudWatch to an S3 bucket, offering close to real-time entry to your audit knowledge. On this context, “real-time” signifies that log knowledge is processed and delivered synchronously as it’s generated, somewhat than being despatched in a pre-defined interval. This answer is good for eventualities requiring rapid entry to log knowledge or for high-volume logging environments.

Whether or not you select the close to real-time streaming strategy or the scheduled export methodology, you may be well-equipped to managed your Aurora MySQL and RDS for MySQL audit logs successfully.

Stipulations for each options

Earlier than getting began, full the next conditions:

  1. Create or have an present RDS for MySQL occasion or Aurora MySQL cluster.
  2. Allow audit logging:
    1. For Amazon RDS, add the MariaDB Audit Plugin inside your possibility group.
    2. For Aurora, allow Superior Auditing inside your parameter group.

Word: In audit logging, by default all customers are logged which might doubtlessly be pricey.

  1. Publish MySQL audit logs to CloudWatch Logs.
  2. Ensure you have a terminal with the AWS Command Line Interface (AWS CLI) put in or use AWS CloudShell inside your console.
  3. Create an S3 bucket to retailer the MySQL audit logs utilizing the under AWS CLI command:

aws s3api create-bucket --bucket

After the command is full, you will note an output just like the next:

Word: Every answer has particular service parts that are mentioned of their respective sections.

Answer #1: Peform audit log batch processing with EventBridge and Lambda

On this answer, we create a Lambda perform to export your audit log to Amazon S3 primarily based on the schedule you set utilizing EventBridge Scheduler. This answer affords a cost-efficient strategy to switch audit log recordsdata inside an S3 bucket in a scheduled method.

Create IAM function for EventBridge Scheduler

Step one is to create an AWS Id and Entry Administration (IAM) function answerable for permitting EventBridge Scheduler to invoke the Lambda perform we are going to create later. Full the next steps to create this function:

  1. Hook up with a terminal with the AWS CLI or CloudShell.
  2. Create a file named TrustPolicyForEventBridgeScheduler.json utilizing your most popular textual content editor:

nano TrustPolicyForEventBridgeScheduler.json

  1. Insert the next belief coverage into the JSON file:
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "scheduler.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": ""
                }
            }
        }
    ]
}

Word: Ensure to amend SourceAccount earlier than saving right into a file. The situation is used to prevents unauthorized entry from different AWS accounts.

  1. Create a file named PermissionsForEventBridgeScheduler.json utilizing your most popular textual content editor:

nano PermissionsForEventBridgeScheduler.json

  1. Insert the next permissions into the JSON file:
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Useful resource": [
                "arn:aws:lambda:::function:*",
                "arn:aws:lambda:::function"
            ]
        }
    ]
}

Word: Exchange with the title of the perform you’ll create later.

  1. Use the next AWS CLI command to create the IAM function for EventBridge Scheduler to invoke the Lambda perform:
aws iam create-role 
--role-name EventBridgeSchedulerLambdaInvoke 
--assume-role-policy-document file://TrustPolicyForEventBridgeScheduler.json

  1. Create the IAM coverage and fix it to the beforehand created IAM function:
aws iam put-role-policy --role-name EventBridgeSchedulerLambdaInvokeRole --policy-name EventBridgeSchedulerLambdaInvoke --policy-document file://PermissionsForEventBridgeScheduler.json

On this part, we created an IAM function with applicable belief and permissions insurance policies that enable EventBridge Scheduler to securely invoke Lambda features out of your AWS account. Subsequent, we’ll create one other IAM function that defines the permissions that your Lambda perform must execute its duties.

Create IAM function for Lambda

The following step is to create an IAM function answerable for permitting Lambda to place information from CloudWatch into your S3 bucket. Full the next steps to create this function:

  1. Hook up with a terminal with the AWS CLI or CloudShell.
  2. Create and write to a JSON file for the IAM belief coverage utilizing your most popular textual content editor:

nano TrustPolicyForLambda.json

  1. Insert the next belief coverage into the JSON file:
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

  1. Use the next AWS CLI command to create the IAM function for Lambda to insert information from CloudWatch to Amazon S3:
aws iam create-role 
--role-name LambdaCWtoS3Role 
--assume-role-policy-document file://TrustPolicyForLambda.json

  1. Create a file named PermissionsForLambda.json utilizing your most popular textual content editor:

nano PermissionsForLambda.json

  1. Insert the next permissions into the JSON file:
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetBucketLocation"
            ],
            "Useful resource": [
                "arn:aws:s3:::",
                "arn:aws:s3:::/*"
            ]
        },
        {
            "Impact": "Enable",
            "Motion": [
                "logs:CreateExportTask"
            ],
            "Useful resource": "arn:aws:logs:*:*:log-group:/aws/rds/occasion/*/audit:*"
        },
        {
            "Impact": "Enable",
            "Motion": [
                "logs:DescribeExportTasks"
            ],
            "Useful resource": "*"
        },
        {
            "Impact": "Enable",
            "Motion": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Useful resource": "arn:aws:logs:*:*:log-group:/aws/lambda/*"
        }
    ]
}

  1. Create the IAM coverage and fix it to the beforehand created IAM function:
aws iam put-role-policy --role-name LambdaCWtoS3Role --policy-name LambdaPermissions --policy-document file://PermissionsForLambda.json

Create ZIP file for the Python Lambda perform

To create a file with the code the Lambda perform will invoke, full the next steps:

  1. Create and write to a file named lambda_function.py utilizing your most popular textual content editor:

nano lambda_function.py

  1. Inside the file, insert the next code:
import boto3
import os
import datetime
import logging
import time
from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError
# Arrange logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def check_active_export_tasks(consumer):
    """Test for any energetic export duties"""
    attempt:
        response = consumer.describe_export_tasks()
        active_tasks = [
            task for task in response.get('exportTasks', [])
            if activity.get('standing', {}).get('code') in ['RUNNING', 'PENDING']
        ]
        return active_tasks
    besides ClientError as e:
        logger.error(f"Error checking energetic export duties: {e}")
        return []
def wait_for_export_task_completion(consumer, max_wait_minutes=15, check_interval=60):
    """Await any energetic export duties to finish"""
    max_wait_seconds = max_wait_minutes * 60
    waited_seconds = 0
    
    whereas waited_seconds < max_wait_seconds:
        active_tasks = check_active_export_tasks(consumer)
        
        if not active_tasks:
            logger.data("No energetic export duties discovered, continuing...")
            return True
        
        logger.data(f"Discovered {len(active_tasks)} energetic export activity(s). Ready {check_interval} seconds...")
        for activity in active_tasks:
            task_id = activity.get('taskId', 'Unknown')
            standing = activity.get('standing', {}).get('code', 'Unknown')
            logger.data(f"Energetic activity ID: {task_id}, Standing: {standing}")
        
        time.sleep(check_interval)
        waited_seconds += check_interval
    
    logger.warning(f"Timed out ready for export duties to finish after {max_wait_minutes} minutes")
    return False
def lambda_handler(occasion, context):
    attempt:
        # Atmosphere variable validation
        required_env_vars = ['GROUP_NAME', 'DESTINATION_BUCKET', 'PREFIX', 'NDAYS']
        missing_vars = [var for var in required_env_vars if not os.environ.get(var)]
        
        if missing_vars:
            error_msg = f"Lacking required surroundings variables: {', '.be a part of(missing_vars)}"
            logger.error(error_msg)
            return {
                'statusCode': 400,
                'physique': {'error': error_msg}
            }
        
        # Get surroundings variables
        GROUP_NAME = os.environ['GROUP_NAME'].strip()
        DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET'].strip()
        PREFIX = os.environ['PREFIX'].strip()
        NDAYS = os.environ['NDAYS'].strip()
        
        # Optionally available: Get retry configuration from surroundings
        MAX_WAIT_MINUTES = int(os.environ.get('MAX_WAIT_MINUTES', '30'))
        CHECK_INTERVAL = int(os.environ.get('CHECK_INTERVAL', '60'))
        RETRY_ON_CONCURRENT = os.environ.get('RETRY_ON_CONCURRENT', 'true').decrease() == 'true'
        
        # Validate surroundings variables usually are not empty
        if not all([GROUP_NAME, DESTINATION_BUCKET, PREFIX, NDAYS]):
            error_msg = "Atmosphere variables can't be empty"
            logger.error(error_msg)
            return {
                'statusCode': 400,
                'physique': {'error': error_msg}
            }
        
        # Convert and validate NDAYS
        attempt:
            nDays = int(NDAYS)
            if nDays <= 0:
                elevate ValueError("NDAYS should be a constructive integer")
        besides ValueError as e:
            error_msg = f"Invalid NDAYS worth '{NDAYS}': {str(e)}"
            logger.error(error_msg)
            return {
                'statusCode': 400,
                'physique': {'error': error_msg}
            }
        
        # Date calculations with validation
        attempt:
            currentTime = datetime.datetime.now()
            StartDate = currentTime - datetime.timedelta(days=nDays)
            EndDate = currentTime - datetime.timedelta(days=nDays - 1)
            
            fromDate = int(StartDate.timestamp() * 1000)
            toDate = int(EndDate.timestamp() * 1000)
            
            # Validate date vary
            if fromDate >= toDate:
                elevate ValueError("Invalid date vary: fromDate should be lower than toDate")
                
        besides (ValueError, OverflowError) as e:
            error_msg = f"Date calculation error: {str(e)}"
            logger.error(error_msg)
            return {
                'statusCode': 400,
                'physique': {'error': error_msg}
            }
        
        # Create S3 path
        attempt:
            BUCKET_PREFIX = os.path.be a part of(PREFIX, StartDate.strftime('%Y{0}%m{0}%d').format(os.path.sep))
        besides Exception as e:
            error_msg = f"Error creating bucket prefix: {str(e)}"
            logger.error(error_msg)
            return {
                'statusCode': 500,
                'physique': {'error': error_msg}
            }
        
        # Log the export particulars
        logger.data(f"Beginning export activity for log group: {GROUP_NAME}")
        logger.data(f"Date vary: {StartDate.strftime('%Y-%m-%d')} to {EndDate.strftime('%Y-%m-%d')}")
        logger.data(f"Vacation spot: s3://{DESTINATION_BUCKET}/{BUCKET_PREFIX}")
        
        # Create boto3 consumer with error dealing with
        attempt:
            consumer = boto3.consumer('logs')
        besides NoCredentialsError:
            error_msg = "AWS credentials not discovered"
            logger.error(error_msg)
            return {
                'statusCode': 500,
                'physique': {'error': error_msg}
            }
        besides Exception as e:
            error_msg = f"Error creating boto3 consumer: {str(e)}"
            logger.error(error_msg)
            return {
                'statusCode': 500,
                'physique': {'error': error_msg}
            }
        
        # Test for energetic export duties earlier than creating a brand new one
        if RETRY_ON_CONCURRENT:
            logger.data("Checking for energetic export duties...")
            active_tasks = check_active_export_tasks(consumer)
            
            if active_tasks:
                logger.data(f"Discovered {len(active_tasks)} energetic export activity(s). Ready for completion...")
                if not wait_for_export_task_completion(consumer, MAX_WAIT_MINUTES, CHECK_INTERVAL):
                    return {
                        'statusCode': 409,
                        'physique': {
                            'error': f'Energetic export activity(s) nonetheless working after {MAX_WAIT_MINUTES} minutes',
                            'activeTaskCount': len(active_tasks)
                        }
                    }
        
        # Create export activity with complete error dealing with
        attempt:
            response = consumer.create_export_task(
                logGroupName=GROUP_NAME,
                fromTime=fromDate,
                to=toDate,
                vacation spot=DESTINATION_BUCKET,
                destinationPrefix=BUCKET_PREFIX
            )
            
            task_id = response.get('taskId', 'Unknown')
            logger.data(f"Export activity created efficiently with ID: {task_id}")
            
            return {
                'statusCode': 200,
                'physique': {
                    'message': 'Export activity created efficiently',
                    'taskId': task_id,
                    'logGroup': GROUP_NAME,
                    'fromDate': StartDate.isoformat(),
                    'toDate': EndDate.isoformat(),
                    'vacation spot': f"s3://{DESTINATION_BUCKET}/{BUCKET_PREFIX}"
                }
            }
            
        besides ClientError as e:
            error_code = e.response['Error']['Code']
            error_msg = e.response['Error']['Message']
            
            # Deal with particular AWS errors
            if error_code == 'ResourceNotFoundException':
                logger.error(f"Log group '{GROUP_NAME}' not discovered")
                return {
                    'statusCode': 404,
                    'physique': {'error': f"Log group '{GROUP_NAME}' not discovered"}
                }
            elif error_code == 'LimitExceededException':
                # That is the concurrent export activity error
                logger.error(f"Export activity restrict exceeded (concurrent activity working): {error_msg}")
                
                # Test if there are energetic duties to supply extra context
                active_tasks = check_active_export_tasks(consumer)
                
                return {
                    'statusCode': 409,
                    'physique': {
                        'error': 'Can't create export activity: One other export activity is already working',
                        'particulars': error_msg,
                        'activeTaskCount': len(active_tasks),
                        'suggestion': 'Just one export activity can run at a time. Please anticipate the present activity to finish or set RETRY_ON_CONCURRENT=true to auto-retry.'
                    }
                }
            elif error_code == 'InvalidParameterException':
                logger.error(f"Invalid parameter: {error_msg}")
                return {
                    'statusCode': 400,
                    'physique': {'error': f"Invalid parameter: {error_msg}"}
                }
            elif error_code == 'AccessDeniedException':
                logger.error(f"Entry denied: {error_msg}")
                return {
                    'statusCode': 403,
                    'physique': {'error': f"Entry denied: {error_msg}"}
                }
            else:
                logger.error(f"AWS ClientError ({error_code}): {error_msg}")
                return {
                    'statusCode': 500,
                    'physique': {'error': f"AWS error: {error_msg}"}
                }
                
        besides BotoCoreError as e:
            error_msg = f"BotoCore error: {str(e)}"
            logger.error(error_msg)
            return {
                'statusCode': 500,
                'physique': {'error': error_msg}
            }
            
        besides Exception as e:
            error_msg = f"Surprising error creating export activity: {str(e)}"
            logger.error(error_msg)
            return {
                'statusCode': 500,
                'physique': {'error': error_msg}
            }
    
    besides Exception as e:
        # Catch-all for any surprising errors
        error_msg = f"Surprising error in lambda_handler: {str(e)}"
        logger.error(error_msg, exc_info=True)
        return {
            'statusCode': 500,
            'physique': {'error': 'Inside server error'}
        }

  1. Zip the file utilizing the next command:

zip perform.zip lambda_function.py

Create Lambda perform

Full the next steps to create a Lambda perform:

  1. Hook up with a terminal with the AWS CLI or CloudShell.
  2. Run the next command, which references the zip file beforehand created:
aws lambda create-function 
--function-name  
--runtime python 3.11
--handler lambda_function.lambda_handler 
--role arn:aws:iam:::function/Export-RDS-CloudWatch-to-S3-Lambda 
--zip-file fileb://perform.zip 
--memory-size 128 
--timeout 15 
--architectures x86_64 
--environment "Variables={DESTINATION_BUCKET=,GROUP_NAME=/aws/rds/occasion/check/audit,PREFIX=exported-logs,NDAYS=1}" 

The NDAYS variable within the previous command will decide the dates of audit logs exported per invocation of the Lambda perform. For instance, in case you plan on exporting logs one time per day to Amazon S3, set NDAYS=1, as proven within the previous command.

  1. Add concurrency limits to maintain executions in management:
aws lambda put-function-concurrency --function-name  
    --reserved-concurrent-executions 2

Word: Reserved concurrency in Lambda units a set restrict on what number of cases of your perform can run concurrently, like having a selected variety of staff for a activity. On this database export situation, we’re limiting it to 2 concurrent executions to forestall overwhelming the database, keep away from API throttling, and guarantee clean, managed exports. This limitation helps preserve system stability, prevents useful resource rivalry, and retains prices in verify

On this part, we created a Lambda perform that can deal with the CloudWatch log exports, configured its important parameters together with surroundings variables, and set a concurrency restrict to make sure managed execution. Subsequent, we’ll create an EventBridge schedule that can routinely set off this Lambda perform at specified intervals to carry out the log exports.

Create EventBridge schedule

Full the next steps to create an EventBridge schedule to invoke the Lambda perform at an interval of your selecting:

  1. Hook up with a terminal with the AWS CLI or CloudShell.
  2. Run the next command:
aws scheduler create-schedule 
  --name  
  --description  
  --schedule-expression "fee(1 day)"  
  --state ENABLED 
  --target "{"Arn":"arn:aws:lambda:: :perform:”,”RoleArn":"","RetryPolicy":{"MaximumEventAgeInSeconds":86400,"MaximumRetryAttempts":0}}" 
  --flexible-time-window "{"Mode":"OFF"}" 
  --action-after-completion "NONE" 
  --time-zone "America/New_York" 
  --region 

The schedule-expression parameter within the previous command should be equal to the environmental variable NDAYS within the beforehand created Lambda perform.

This answer gives an environment friendly, scheduled strategy to exporting RDS audit logs to Amazon S3 utilizing AWS Lambda and EventBridge Scheduler. By leveraging these serverless parts, we’ve created a cheap, automated system that periodically transfers audit logs to S3 for long-term storage and evaluation. This methodology is especially helpful for organizations that want common, batch-style exports of their database audit logs, permitting for simpler compliance reporting and historic knowledge evaluation.

Whereas the primary answer affords a scheduled, batch-processing strategy, some eventualities require a extra real-time answer for audit log processing. In our subsequent answer, we’ll discover how you can create a close to real-time audit log processing system utilizing Amazon Kinesis Information Firehose. This strategy will enable for steady streaming of audit logs from RDS to S3, offering nearly rapid entry to log knowledge.

Answer 2: Create close to real-time audit log processing with Amazon Information Firehose

On this part, we evaluation how you can create a close to real-time audit log export to Amazon S3 utilizing the facility of Information Firehose. With this answer, you possibly can immediately load the most recent audit log recordsdata to an S3 bucket for fast evaluation, manipulation, or different functions.

Create IAM function for CloudWatch Logs

Step one is to create an IAM function answerable for permitting CloudWatch Logs to place information into the Firehose supply stream (CWLtoDataFirehoseRole). Full the next steps to create this function:

  1. Hook up with a terminal with the AWS CLI or CloudShell.
  2. Create and write to a JSON file for the IAM belief coverage utilizing your most popular textual content editor:

nano TrustPolicyForCWL.json

  1. Insert the next belief coverage into the JSON file:
{
  "Assertion": {
    "Impact": "Enable",
    "Principal": { "Service": "logs.amazonaws.com" },
    "Motion": "sts:AssumeRole",
    "Situation": { 
         "StringLike": { 
             "aws:SourceArn": "arn:aws:logs:::*"
         } 
     }
  }
}

  1. Create and write to a brand new JSON file for the IAM permissions coverage utilizing your most popular textual content editor:

nano PermissionsForCWL.json

  1. Insert the next permissions into the JSON file:
{
    "Assertion":[
      {
        "Effect":"Allow",
        "Action":["firehose:PutRecord"],
        "Useful resource":[
            "arn:aws:firehose:region:account-id:deliverystream/"]
      }
    ]
}

  1. Use the next AWS CLI command to create the IAM function for CloudWatch Logs to insert information into the Firehose supply stream:
aws iam create-role 
--role-name CWLtoDataFirehoseRole 
--assume-role-policy-document file://TrustPolicyForCWL.json

  1. Create the IAM coverage and fix it to the beforehand created IAM function:
aws iam put-role-policy --role-name CWLtoDataFirehoseRole --policy-name Permissions-Coverage-For-CWL --policy-document file://PermissionsForCWL.json

Create IAM function for Firehose supply stream

The following step is to create an IAM function (DataFirehosetoS3Role) answerable for permitting the Firehose supply stream to insert the audit logs into an S3 bucket. Full the next steps to create this function:

  1. Hook up with a terminal with the AWS CLI or CloudShell.
  2. Create and write to a JSON file for the IAM belief coverage utilizing your most popular textual content editor:

nano PermissionsForCWL.json

  1. Insert the next belief coverage into the JSON file:
{
  "Assertion": {
    "Impact": "Enable",
    "Principal": { "Service": "firehose.amazonaws.com" },
    "Motion": "sts:AssumeRole"
    } 
}

  1. Create and write to a brand new JSON file for the IAM permissions utilizing your most popular textual content editor:

nano PermissionsForCWL.json

  1. Insert the next permissions into the JSON file:
{
  "Assertion": [
    {
      "Effect": "Allow",
      "Action": [ 
          "s3:AbortMultipartUpload", 
          "s3:GetBucketLocation", 
          "s3:GetObject", 
          "s3:ListBucket", 
          "s3:ListBucketMultipartUploads", 
          "s3:PutObject" ],
      "Useful resource": [ 
          "arn:aws:s3:::", 
          "arn:aws:s3:::/*" ]
    }
  ]
}

  1. Use the next AWS CLI command to create the IAM function for Information Firehose to carry out operations on the S3 bucket:
aws iam create-role 
--role-name DataFirehosetoS3Role 
--assume-role-policy-document file://TrustPolicyForFirehose.json

  1. Create the IAM coverage and fix it to the beforehand created IAM function:
aws iam put-role-policy --role-name DataFirehosetoS3Role --policy-name Permissions-Coverage-For-Firehose --policy-document file://PermissionsForFirehose.json

Create the Firehose supply stream

Now you create the Firehose supply stream to permit close to real-time switch of MySQL audit logs from CloudWatch Logs to your S3 bucket. Full the next steps:

  1. Create the Firehose supply stream with the next AWS CLI command. Setting the buffer interval and dimension determines how lengthy your knowledge is buffered earlier than being delivered to the S3 bucket. For extra data, seek advice from AWS documentation. On this instance, we use the default values:
aws firehose create-delivery-stream 
   --delivery-stream-name '' 
   --s3-destination-configuration 
  '{"RoleARN": "arn:aws:iam:::function/DataFirehosetoS3Role", "BucketARN": "arn:aws:s3:::"}'

  1. Wait till the Firehose supply stream turns into energetic (this would possibly take a couple of minutes). You should use the Firehose CLI describe-delivery-stream command to verify the standing of the supply stream. Word the DeliveryStreamDescription.DeliveryStreamARN worth, to make use of in a later step:

aws firehose describe-delivery-stream --delivery-stream-name

  1. After the Firehose supply stream is in an energetic state, create a CloudWatch Logs subscription filter. This subscription filter instantly begins the circulate of close to real-time log knowledge from the chosen log group to your Firehose supply stream. Ensure to supply the log group title that you simply wish to push to Amazon S3 and correctly copy the destination-arn of your Firehose supply stream:
aws logs put-subscription-filter 
    --log-group-name  
    --filter-name "Vacation spot" 
    --filter-pattern “” 
    --destination-arn "arn:aws:firehose:area:accountID:deliverystream/" 
    --role-arn "arn:aws:iam::accountID:function/CWLtoDataFirehoseRole"

Your close to real-time MySQL audit log answer is now correctly configured and can start delivering MySQL audit logs to your S3 bucket via the Firehose supply stream.

Clear up

To wash up your assets, full the next steps (relying on which answer you used):

  1. Delete the RDS occasion or Aurora cluster.
  2. Delete the Lambda features.
  3. Delete the EventBridge rule.
  4. Delete the S3 bucket.
  5. Delete the Firehose supply stream.

Conclusion

On this put up, we’ve offered two options for managing Aurora MySQL or RDS for MySQL audit logs, every providing distinctive advantages for various enterprise use instances.

We encourage you to implement these options in your individual surroundings and share your experiences, challenges, and success tales within the feedback part. Your suggestions and real-world implementations might help fellow AWS customers select and adapt these options to greatest match their particular audit logging wants.


Concerning the authors

Mahek Shah

Mahek Shah

Mahek is a Cloud Help Engineer I who has labored throughout the AWS database workforce for nearly 2 years. Mahek is an Amazon Aurora MySQL and RDS MySQL material skilled with deep experience in serving to prospects implement strong, high-performing, and safe database options throughout the AWS Cloud.

Ryan Moore

Ryan Moore

Ryan is a Technical Account Supervisor at AWS with three years of expertise, having launched his profession on the AWS database workforce. He’s an Aurora MySQL and RDS MySQL material skilled that focuses on enabling prospects to construct performant, scalable, and safe architectures throughout the AWS Cloud.

Nirupam Datta

Nirupam Datta

Nirupam is a Sr. Technical Account Supervisor at AWS. He has been with AWS for over 6 years. With over 14 years of expertise in database engineering and infra-architecture, Nirupam can also be an issue skilled within the Amazon RDS core techniques and Amazon RDS for SQL Server. He gives technical help to prospects, guiding them emigrate, optimize, and navigate their journey within the AWS Cloud.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles