We believe the most successful organizations will be those that view AI not just as a tool for automation, but as a catalyst for transforming how they approach software development entirely. The real strategic advantage will come from reimagining software development processes and culture to fully leverage AI’s capabilities. This includes rethinking traditional metrics, redefining developer productivity, and creating space and cultural change for teams to experiment with new ways of working.
This powerful observation from our April 2025 blog post “How generative AI is transforming developer workflows at Amazon” is already proving true in practice. Organizations using Amazon Q Developer are actively implementing new metrics to understand how developers leverage AI features. This data-driven approach helps them identify usage patterns, uncover areas for improvement, and recognize internal champions who drive adoption. It takes time and practice to get comfortable with prompting and understand the capabilities of new tools. I have identified three question that customers ask to measure and evaluate their Amazon Q Developer adoption.
- How many Q Developer active users are there?
- How can we track usage trends?
- Who are our power users?
This blog post will explore the Amazon Q Developer monitoring and tracking tools, and how they can together provide a comprehensive view of developer usage, which answer the questions listed above.
Subscription Management
The Amazon Q Developer subscription console serves as your primary source for managing Q subscriptions. The How to identify inactive users of Amazon Q Developer blog details license activity, and shows how you can navigate thought the Amazon Q Developer Console, where you can download a report showing users from all AWS organization accounts, their status (active, pending, or canceled), and the last activity date. This document from our user guide, walk you through how to enabled the organization-wide visibility at your AWS Organization management account.
The table below contains an extract of the CSV file created when you click on Download total users reports from the Amazon Q Developer Subscription Console page.
Name |
Subscription type |
Subscription status |
Identity provider user ID |
Last activity date |
anakin |
Group |
Pending |
14a81418-b0b1-70b7-c73a |
N/A |
artur |
Group |
Active |
34a87408-6091-7054-4b60 |
May 11 2025 |
ashoka |
Individual |
Pending |
64d8a448-9021-7020-c73d |
N/A |
uther |
Group |
Active |
94e844b8-4031-7022-303d |
April 23 2025 |
John.Smith |
Group |
Pending |
e4c84458-30b1-7097-2496 |
N/A |
luke |
Group |
Pending |
f4a89418-c0e1-705b-d050 |
N/A |
Table 1 – CSV extract of total users report
This report displays Amazon Q Developer Pro user subscriptions, including subscription details, status, and last recorded account activity dates, which will be joined with usage metrics to generate customer insights.
Q Developer dashboard usage metrics
The Amazon Q Developer dashboard summarizes the data about how your Pro tier subscribers use the service. Amazon Q Developer generates and displays new metrics on an hourly basis for the most part. The only section that is not updated hourly is the Active user’s widget, which is updated daily according to the coordinated universal time (UTC) clock. The dashboard shows metrics collected from users who are subscribed in the AWS account that you’re currently signed into.
There are many metrics shown in the Q Developer dashboard usage metrics that help administrators monitor activity; detailed information and metrics are available on our documentation.
User Activity Reporting
The user activity reports in Amazon Q Developer provide metrics that detail how users interact with the service. To use them, you need to enable the feature and define an Amazon S3 bucket to save the CSV reports. Amazon Q Developer generates the report every day at midnight UTC and saves it into the designated bucket. Each row in the CSV file represents a user who interacted with Amazon Q Developer that day, and each column shows a metric as described in the User activity report metrics. These metrics are calculated based on the user telemetry collected over the course of the day. Instructions on how to enable and configure the User Activity Report Metrics can be found on our documentation.
Creating per-user level monthly report
To connect to the Amazon Q Developer Console, navigate to the AWS account from which you want to export your metrics. In the Amazon Q Developer Console, select “Settings” and then click on the “Edit” button for “Amazon Q Developer Usage Activity,” which is turned off by default.

Image 1 – Amazon Q Developer usage activity setting disabled
When prompted, enable the “Collect granular metrics per user” and define the s3://bucket/prefix. In my example, I pointed to the s3://q-dev-user-activity-<account-number>/csv

Image 2 – Amazon Q Developer usage activity setting enabled
The csv files will be saved into the following directory
s3://bucketName/prefix/AWSLogs/accountId/QDeveloperLogs/by_user_analytic/region/year/month/day/utc-hour/
How to process the User Activity Report
I developed script that processes user activity data and subscription information stored in S3, combining them into monthly reports. The process_metrics.py script is a Python utility designed to process Amazon Q Developer user activity data and subscription information from AWS S3, combining them to generate monthly reports in CSV format. It retrieves data from S3 buckets, maps user IDs to names, aggregates metrics by user and month, and generates reports. The script leverages pandas for data manipulation. It generates a clean, structured CSV reports ready for your preferred analytics platform. The script logic derives the S3 directory structure, using current year, region and the AWS accountid, and all you need to inform is the bucket name and the prefix you initially configure for the CSV file. The script is shared below.
"""
Amazon Q Metrics Processor
This script processes user activity data and subscription information from S3,
combines them, and generates monthly CSV reports.
"""
import os
import boto3
import botocore
from botocore.config import Config
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from io import StringIO
import re
import argparse
from pathlib import Path
import sys
from urllib.parse import urlparse
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('q-metrics')
def escape_log_data(data):
"""
Sanitize data for secure logging to prevent log injection (CWE-117, CWE-93).
Args:
data: The data to be sanitized for logging
Returns:
str: Sanitized string safe for logging
"""
if data is None:
return 'None'
data_str = str(data)
data_str = data_str.replace('\n', '\\n').replace('\r', '\\r')
result = ''
for char in data_str:
if ord(char) < 32 or ord(char) == 127:
result += f'\\x{ord(char):02x}'
else:
result += char
return result
def validate_s3_bucket_name(bucket_name):
"""
Validate S3 bucket name according to AWS naming rules.
Args:
bucket_name (str): The bucket name to validate
Returns:
bool: True if valid, False otherwise
"""
if not bucket_name or not isinstance(bucket_name, str):
return False
if len(bucket_name) 63:
return False
if not re.match(r'^[a-z0-9][a-z0-9\.-]*[a-z0-9]$', bucket_name):
return False
if '..' in bucket_name:
return False
if re.match(r'^\d+\.\d+\.\d+\.\d+$', bucket_name):
return False
try:
if bucket_name.startswith('xn--') or bucket_name.endswith('-s3alias'):
return False
except ValueError:
return False
return True
def validate_s3_path(path):
"""
Validate an S3 path to ensure it doesn't contain dangerous characters.
Args:
path (str): The S3 path to validate
Returns:
bool: True if valid, False otherwise
"""
if not path or not isinstance(path, str):
return False
if '..' in path:
return False
try:
if re.search(r'[:"|?*\x00-\x1F]', path):
return False
except ValueError:
return False
return True
def validate_month_format(month_str):
"""
Validate that a month string is in the format YYYY-MM.
Args:
month_str (str): The month string to validate
Returns:
bool: True if valid, False otherwise
"""
if not month_str:
return True
if not isinstance(month_str, str):
return False
if not re.match(r'^[0-9]{4}-[0-9]{2}$', month_str):
return False
try:
year, month = month_str.split('-')
month_num = int(month)
if month_num 12:
return False
except (ValueError, TypeError):
return False
return True
def sanitize_output_path(path):
"""
Sanitize and normalize an output directory path.
Args:
path (str): The path to sanitize
Returns:
str: The sanitized absolute path
"""
if not path or not isinstance(path, str):
return os.path.abspath('./output')
safe_path = Path(path).resolve()
try:
if not safe_path.is_absolute():
safe_path = Path.cwd() / safe_path
except (ValueError, TypeError):
logger.warning("Invalid path: %s, using default", escape_log_data(repr(path)))
return os.path.abspath('./output')
return str(safe_path)
def validate_csv_content(df, expected_columns, file_type):
"""
Validate that a DataFrame has the expected columns and structure.
Args:
df (DataFrame): The pandas DataFrame to validate
expected_columns (list): List of required column names
file_type (str): Type of file for logging purposes
Returns:
bool: True if valid, False otherwise
"""
if df is None or df.empty:
logger.error("Empty %s data", file_type)
return False
missing_columns = [col for col in expected_columns if col not in df.columns]
if missing_columns:
logger.error("Missing required columns in %s data: %s", file_type, ', '.join(missing_columns))
return False
for col in expected_columns:
if df[col].isna().all():
logger.error("Column '%s' in %s data is completely empty", col, file_type)
return False
if len(df) == 0:
logger.error("No data rows in %s data", file_type)
return False
if df.duplicated().any():
dup_count = df.duplicated().sum()
logger.warning("Found %d duplicate rows in %s data", dup_count, file_type)
return True
def validate_subscription_data(df):
"""
Validate subscription data format and content.
Args:
df (DataFrame): The subscription data DataFrame
Returns:
bool: True if valid, False otherwise
"""
required_columns = [
'Name',
'Subscription type',
'Subscription status',
'Identity provider user ID'
]
return validate_csv_content(df, required_columns, "subscription")
def validate_activity_data(df):
"""
Validate activity data format and content.
Args:
df (DataFrame): The activity data DataFrame
Returns:
bool: True if valid, False otherwise
"""
required_columns = ['UserId', 'Date']
if not validate_csv_content(df, required_columns, "activity"):
return False
metric_cols = df.select_dtypes(include=[np.number]).columns
if len(metric_cols) == 0:
logger.error("No metric columns found in activity data")
return False
try:
valid_dates = 0
total_dates = len(df['Date'])
for date_str in df['Date']:
if parse_date(date_str) is not None:
valid_dates += 1
if valid_dates == 0:
logger.error("No valid dates found in activity data")
return False
elif valid_dates < total_dates:
logger.warning("Only %d out of %d dates are valid in activity data", valid_dates, total_dates)
except Exception as e:
logger.error("Error validating dates in activity data: %s", str(e))
return False
return True
def parse_args():
"""Parse command line arguments with enhanced security validation."""
parser = argparse.ArgumentParser(description='Process Amazon Q metrics data')
parser.add_argument('--bucket', required=True, help='S3 bucket name')
parser.add_argument('--prefix', required=True, help='Initial S3 prefix (e.g., "logs")')
parser.add_argument('--subscription-path', required=True,
help='S3 path to subscription file (e.g., user-activities/subscriptions.csv)')
parser.add_argument('--output-dir', default='./output', help='Local directory for output files')
parser.add_argument('--month', help='Process only a specific month (format: YYYY-MM, e.g., 2025-01)')
parser.add_argument('--per-user', action='store_true', help='Generate individual reports for each user')
parser.add_argument('--user', help='Generate report for a specific user (by name)')
args = parser.parse_args()
if not validate_s3_bucket_name(args.bucket):
logger.error("Invalid S3 bucket name: %r", escape_log_data(args.bucket))
sys.exit(1)
if re.search(r'[;&|`$]', args.bucket):
logger.error("Potentially malicious characters in bucket name: %r", escape_log_data(args.bucket))
sys.exit(1)
try:
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']
session = boto3.session.Session()
region = session.region_name or os.environ.get('AWS_REGION', 'us-east-1')
supported_regions = ['us-east-1', 'eu-central-1']
if region not in supported_regions:
logger.error("Region %s is not supported. This solution currently only works in %s",
escape_log_data(region), ' or '.join(supported_regions))
sys.exit(1)
current_year = str(datetime.now().year)
if args.month:
try:
year, _ = args.month.split('-')
current_year = year
except (ValueError, TypeError):
pass
activity_prefix = f"{args.prefix}/AWSLogs/{account_id}/QDeveloperLogs/by_user_analytic/{region}/{current_year}"
logger.info("Derived activity prefix: %s", escape_log_data(activity_prefix))
except Exception as e:
logger.error("Failed to derive activity prefix: %s", escape_log_data(str(e)))
sys.exit(1)
for path_arg, path_name in [(activity_prefix, "activity prefix"),
(args.subscription_path, "subscription path")]:
if not validate_s3_path(path_arg):
logger.error("Invalid S3 %s: %r", path_name, escape_log_data(path_arg))
sys.exit(1)
if re.search(r'[;&|`$]', path_arg):
logger.error("Potentially malicious characters in %s: %r", path_name, escape_log_data(path_arg))
sys.exit(1)
if any(pattern in path_arg for pattern in ['../', '..\\', '../', '..\\']):
logger.error("Path traversal attempt detected in %s: %r", path_name, escape_log_data(path_arg))
sys.exit(1)
if args.month:
if not validate_month_format(args.month):
logger.error("Invalid month format: %r. Expected format: YYYY-MM (e.g., 2025-01)", escape_log_data(args.month))
sys.exit(1)
try:
year, month = args.month.split('-')
year_num = int(year)
month_num = int(month)
current_year = datetime.now().year
if year_num current_year + 1:
logger.warning("Month year %d is outside the reasonable range (%d-%d)",
year_num, current_year - 5, current_year + 1)
except (ValueError, TypeError):
pass
if args.user:
if len(args.user) > 100:
logger.error("User name too long: %d characters (max 100)", len(args.user))
sys.exit(1)
if re.search(r'[;&|`$]', args.user):
logger.error("Potentially malicious characters in user name: %r", escape_log_data(args.user))
sys.exit(1)
args.output_dir = sanitize_output_path(args.output_dir)
try:
output_path = Path(args.output_dir)
if output_path.exists() and not output_path.is_dir():
logger.error("Output path exists but is not a directory: %r", escape_log_data(args.output_dir))
sys.exit(1)
parent_dir = output_path if output_path.exists() else output_path.parent
if not os.access(parent_dir, os.W_OK):
logger.error("No write permission for output directory: %r", escape_log_data(args.output_dir))
sys.exit(1)
except Exception as e:
logger.error("Error validating output directory: %s", escape_log_data(str(e)))
sys.exit(1)
logger.info("Using output directory: %r", escape_log_data(args.output_dir))
args.activity_prefix = activity_prefix
return args
def get_s3_client():
"""Create and return an S3 client with proper configuration."""
try:
config = Config(
connect_timeout=5,
read_timeout=30,
retries={'max_attempts': 3},
signature_version='s3v4'
)
s3 = boto3.client('s3', config=config)
try:
s3.head_bucket(Bucket='aws-sdk-resources')
except botocore.exceptions.ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code == '403':
logger.warning("AWS credentials appear valid but have insufficient permissions")
elif error_code == '401':
logger.error("Invalid AWS credentials")
sys.exit(1)
except Exception as e:
logger.warning("Error testing S3 client: %s", str(e))
return s3
except Exception as e:
logger.error("Failed to create S3 client: %s", str(e))
sys.exit(1)
def list_activity_files(s3_client, bucket, prefix, month=None):
"""List all activity files in the specified S3 bucket and prefix.
If month is specified, only return files for that month (format: YYYY-MM).
Args:
s3_client: The boto3 S3 client
bucket (str): The S3 bucket name
prefix (str): The S3 prefix for activity files
month (str, optional): Month in YYYY-MM format
Returns:
list: List of S3 keys for activity files
"""
if not validate_s3_bucket_name(bucket):
logger.error("Invalid bucket name: %r", escape_log_data(bucket))
return []
if not validate_s3_path(prefix):
logger.error("Invalid S3 prefix: %r", escape_log_data(prefix))
return []
if month and not validate_month_format(month):
logger.error("Invalid month format: %r", escape_log_data(month))
return []
prefix = prefix.replace('..', '').replace('//', '/')
logger.info("Listing activity files in s3://%s/%s", escape_log_data(bucket), escape_log_data(prefix))
activity_files = []
paginator = s3_client.get_paginator('list_objects_v2')
try:
search_prefix = prefix
if month:
try:
year, month_num = month.split('-')
if not prefix.endswith(year + '/'):
search_prefix = f"{prefix}{month_num}/"
else:
search_prefix = f"{prefix}{month_num}/"
logger.info("Filtering for month %s using prefix: %s", escape_log_data(month), escape_log_data(search_prefix))
except ValueError:
logger.error("Failed to parse month %r for filtering", escape_log_data(month))
return []
for page in paginator.paginate(Bucket=bucket, Prefix=search_prefix):
if 'Contents' in page:
for obj in page['Contents']:
key = obj['Key']
if not key.endswith('/') and key.endswith('.csv'):
activity_files.append(key)
logger.info("Found %d activity files", len(activity_files))
return activity_files
except Exception as e:
logger.error("Error listing activity files: %s", escape_log_data(str(e)))
if isinstance(e, client('s3').exceptions.NoSuchBucket):
logger.error("Bucket %r does not exist", escape_log_data(bucket))
elif isinstance(e, client('s3').exceptions.AccessDenied):
logger.error("Access denied to bucket %r", escape_log_data(bucket))
return []
def read_csv_from_s3(s3_client, bucket, key):
"""Read a CSV file from S3 into a pandas DataFrame.
Args:
s3_client: The boto3 S3 client
bucket (str): The S3 bucket name
key (str): The S3 object key
Returns:
DataFrame or None: Pandas DataFrame with CSV content or None if error
"""
if not validate_s3_bucket_name(bucket):
logger.error("Invalid bucket name: %r", escape_log_data(bucket))
return None
if not validate_s3_path(key):
logger.error("Invalid S3 key: %r", escape_log_data(key))
return None
logger.info("Reading s3://%s/%s", escape_log_data(bucket), escape_log_data(key))
try:
try:
head_response = s3_client.head_object(Bucket=bucket, Key=key)
file_size_bytes = head_response.get('ContentLength', 0)
MAX_FILE_SIZE_BYTES = 100 * 1024 * 1024
if file_size_bytes > MAX_FILE_SIZE_BYTES:
logger.error("File %r size (%d bytes) exceeds the maximum allowed size (%d bytes)",
escape_log_data(key), file_size_bytes, MAX_FILE_SIZE_BYTES)
return None
except Exception as e:
logger.warning("Could not check file size for %r: %s", escape_log_data(key), escape_log_data(str(e)))
response = s3_client.get_object(Bucket=bucket, Key=key)
content = response['Body'].read().decode('utf-8')
if any(suspicious_pattern in content for suspicious_pattern in ['=cmd|', '=cmd:', '@cmd', '+cmd', '-cmd', '=DDE', '=SUM(', '=HYPERLINK(']):
logger.error("Potential CSV injection detected in %r", escape_log_data(key))
return None
csv_buffer = StringIO(content)
try:
row_count = sum(1 for _ in csv_buffer) - 1
csv_buffer.seek(0)
MAX_ROWS = 500000
if row_count > MAX_ROWS:
logger.error("CSV file %r has %d rows, which exceeds the maximum limit of %d rows",
escape_log_data(key), row_count, MAX_ROWS)
return None
df = pd.read_csv(csv_buffer, converters={col: str for col in ['Name', 'UserId', 'Subscription', 'Subscription type', 'Subscription status']})
memory_usage_bytes = df.memory_usage(deep=True).sum()
MAX_MEMORY_USAGE = 500 * 1024 * 1024
if memory_usage_bytes > MAX_MEMORY_USAGE:
logger.error("DataFrame for %r uses %d bytes of memory, exceeding the limit of %d bytes",
escape_log_data(key), memory_usage_bytes, MAX_MEMORY_USAGE)
del df
return None
if len(df) > 100000:
logger.warning("CSV file %r has %d rows, which exceeds the recommended limit", escape_log_data(key), len(df))
return df
except pd.errors.ParserError as e:
logger.error("Error parsing CSV %r: %s", escape_log_data(key), escape_log_data(str(e)))
return None
except Exception as e:
logger.error("Error reading %r: %s", escape_log_data(key), escape_log_data(str(e)))
return None
def parse_date(date_str):
"""Parse date strings into datetime objects."""
if pd.isna(date_str) or date_str == 'N/A':
return None
date_formats = [
'%m-%d-%Y',
'%B %d %Y',
]
for fmt in date_formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
logger.warning("Could not parse date: %r", escape_log_data(date_str))
return None
def extract_month_year(date_obj):
"""Extract month and year from a datetime object."""
if date_obj is None:
return None
try:
return f"{date_obj.year}-{int(date_obj.month):02d}"
except (AttributeError, ValueError, TypeError) as e:
logger.debug("Failed to extract month/year from date object: %s", escape_log_data(str(e)))
return None
def process_subscription_data(df):
"""Process subscription data."""
logger.info("Processing subscription data")
if not validate_subscription_data(df):
logger.error("Invalid subscription data format")
return None, {}
df_clean = df.copy()
if 'Subscription' not in df_clean.columns:
df_clean['Subscription'] = df_clean['Subscription type']
logger.info("Added 'Subscription' column based on 'Subscription type'")
id_to_name_map = dict(zip(df_clean['Identity provider user ID'], df_clean['Name']))
df_clean['Last activity date'] = df_clean['Last activity date'].apply(parse_date)
df_clean['Last activity month'] = df_clean['Last activity date'].apply(extract_month_year)
return df_clean, id_to_name_map
def process_activity_data(activity_dfs, id_to_name_map):
"""Process and combine activity data."""
logger.info("Processing activity data")
if not activity_dfs:
logger.warning("No activity data to process")
return pd.DataFrame()
valid_dfs = []
for i, df in enumerate(activity_dfs):
if validate_activity_data(df):
valid_dfs.append(df)
else:
logger.warning("Skipping invalid activity data file #%d", i+1)
if not valid_dfs:
logger.error("No valid activity data files found")
return pd.DataFrame()
try:
combined_df = pd.concat(valid_dfs, ignore_index=True)
combined_df['Date'] = combined_df['Date'].apply(parse_date)
combined_df['Month'] = combined_df['Date'].apply(extract_month_year)
combined_df['Name'] = combined_df['UserId'].map(id_to_name_map)
unmapped_ids = combined_df[combined_df['Name'].isna()]['UserId'].unique()
if len(unmapped_ids) > 0:
logger.warning("Found %d user IDs without matching names", len(unmapped_ids))
if len(unmapped_ids) MAX_REPORTS:
logger.warning("Too many months (%d). Limiting to %d most recent months.",
len(months), MAX_REPORTS)
months = sorted(months, reverse=True)[:MAX_REPORTS]
reports_generated = 0
for month in months:
if pd.isna(month):
logger.debug("Skipping null month value")
continue
if specific_month and month != specific_month:
continue
if not validate_month_format(month):
logger.warning("Skipping invalid month format: %r", escape_log_data(month))
continue
try:
month_df = agg_df[agg_df['Month'] == month]
safe_month = re.sub(r'[^\w\-]', '_', month)
if not safe_month or safe_month.startswith('.'):
logger.warning("Invalid month format after sanitization: %r", escape_log_data(safe_month))
continue
output_file = os.path.join(output_dir, f"q-metrics-{safe_month}.csv")
output_path = Path(output_file).resolve()
if not str(output_path).startswith(str(Path(output_dir).resolve())):
logger.error("Path traversal attempt detected in output file: %r", escape_log_data(output_file))
continue
if month_df.empty:
logger.warning("No data for month %r, skipping report generation", escape_log_data(month))
continue
if len(month_df) > 100000:
logger.warning("Month %r has too many rows (%d). Truncating to 100,000 rows.",
escape_log_data(month), len(month_df))
month_df = month_df.head(100000)
temp_file = output_file + '.tmp'
month_df.to_csv(temp_file, index=False)
os.replace(temp_file, output_file)
os.chmod(output_file, 0o640)
logger.info("Generated report for %s: %s", escape_log_data(month), escape_log_data(output_file))
reports_generated += 1
except (PermissionError, OSError) as e:
logger.error("Error writing to %r: %s", escape_log_data(output_file), escape_log_data(str(e)))
except Exception as e:
logger.error("Error generating report for month %r: %s", escape_log_data(month), escape_log_data(str(e)))
return reports_generated
def generate_user_reports(agg_df, output_dir, specific_user=None):
"""Generate per-user CSV reports.
If specific_user is provided, only generate a report for that user.
Args:
agg_df (DataFrame): Aggregated data frame
output_dir (str): Output directory path
specific_user (str, optional): Specific user name
Returns:
int: Number of reports generated
"""
logger.info("Generating per-user reports")
output_dir = sanitize_output_path(output_dir)
user_dir = os.path.join(output_dir, "users")
try:
os.makedirs(user_dir, exist_ok=True)
except (PermissionError, OSError) as e:
logger.error("Cannot create user directory %s: %s", escape_log_data(user_dir), escape_log_data(str(e)))
return 0
if agg_df.empty:
logger.warning("No data available to generate user reports")
return 0
if 'Name' not in agg_df.columns:
logger.error("Required column 'Name' not found in data")
return 0
users = agg_df['Name'].unique()
if len(users) == 0:
logger.warning("No user data available in the aggregated dataframe")
return 0
reports_generated = 0
for user in users:
if pd.isna(user):
logger.debug("Skipping null user value")
continue
if specific_user and user != specific_user:
continue
try:
user_df = agg_df[agg_df['Name'] == user]
if user_df.empty:
logger.warning("No data for user %r, skipping report generation", escape_log_data(user))
continue
safe_name = re.sub(r'[^\w\-]', '_', str(user))
safe_name = os.path.basename(safe_name)
if not safe_name or safe_name.startswith('.'):
safe_name = f"user_{hash(user) % 10000}"
output_file = os.path.join(user_dir, f"q-metrics-{safe_name}.csv")
user_df.to_csv(output_file, index=False)
logger.info("Generated report for user %r: %s", escape_log_data(user), escape_log_data(output_file))
reports_generated += 1
except (PermissionError, OSError) as e:
logger.error("Error writing to %s: %s", escape_log_data(output_file), escape_log_data(str(e)))
except Exception as e:
logger.error("Error generating report for user %r: %s", escape_log_data(user), escape_log_data(str(e)))
return reports_generated
def main():
"""Main function to process metrics data with enhanced security."""
try:
args = parse_args()
s3_client = get_s3_client()
import signal
def timeout_handler(signum, frame):
logger.error("Operation timed out")
sys.exit(1)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(600)
subscription_df = read_csv_from_s3(s3_client, args.bucket, args.subscription_path)
if subscription_df is None:
logger.error("Failed to read subscription data. Exiting.")
return 1
subscription_df, id_to_name_map = process_subscription_data(subscription_df)
if subscription_df is None:
logger.error("Failed to process subscription data. Exiting.")
return 1
activity_files = list_activity_files(s3_client, args.bucket, args.activity_prefix, args.month)
if not activity_files:
logger.warning("No activity files found. Proceeding with empty activity data.")
MAX_FILES = 100
if len(activity_files) > MAX_FILES:
logger.warning("Too many activity files found (%d). Limiting to %d files.",
len(activity_files), MAX_FILES)
activity_files = activity_files[:MAX_FILES]
activity_dfs = []
for file_key in activity_files:
if hasattr(signal, 'SIGINFO'):
signal.signal(signal.SIGINFO, lambda signum, frame: logger.info("Processing file: %s", file_key))
df = read_csv_from_s3(s3_client, args.bucket, file_key)
if df is not None:
activity_dfs.append(df)
else:
logger.warning("Skipping invalid activity file: %r", file_key)
activity_df = process_activity_data(activity_dfs, id_to_name_map)
if activity_df.empty and activity_files:
logger.warning("No valid activity data could be processed from %d files", len(activity_files))
merged_df = merge_data(activity_df, subscription_df)
agg_df = aggregate_by_month(merged_df)
if agg_df.empty:
logger.warning("No data available after aggregation. Reports will be empty.")
num_reports = generate_monthly_reports(agg_df, args.output_dir, args.month)
logger.info("Successfully generated %d monthly reports", num_reports)
if args.per_user or args.user:
num_user_reports = generate_user_reports(agg_df, args.output_dir, args.user)
logger.info("Successfully generated %d user reports", num_user_reports)
signal.alarm(0)
return 0
except Exception as e:
logger.error("Error processing metrics: %s", escape_log_data(str(e)))
if logger.level <= logging.DEBUG:
logger.debug("Detailed error information:", exc_info=True)
return 1
finally:
if 'signal' in locals():
try:
signal.alarm(0)
except:
pass
if __name__ == "__main__":
exit(main())
This is an example of how you can invoke it:
python3 process_metrics.py --bucket my-metrics-bucket \
--prefix logs \
--subscription-path user-activities/subscriptions.csv \
--output-dir ./reports \
--month 2025-01 \
--per-user
This command will:
- Read subscription data from
s3://my-metrics-bucket/user-activities/subscriptions.csv
- Look for activity logs in
s3://my-metrics-bucket/logs/AWSLogs/{account-id}/QDeveloperLogs/by_user_analytic/{region}/2025/01/
- Process only data from January 2025 (due to the –month 2025-01 parameter)
- Generate both monthly summary reports and individual user reports (due to –per-user)
- Save all reports to the ./reports directory
In this new era of prompts, I will also share the prompt used as the foundation for the script that processes user activity data and subscription information stored in S3, combining them into monthly reports. Note that the original prompt was simpler. I utilized Q Developer to enhance the prompt based on my initial request. The final code integrates elements from the initial prompt and my modifications. The initial prompt likely contributed to 80-90% of the final script.
Output example
Table 2 presents an extract of the script’s output, consolidating user interactions during March 2025. The data reveals interesting usage patterns among team members. While Artur shows a strong preference for chat and inline-chat interactions, Uther tends to engage more frequently with the /dev agent. This contrast in usage styles presents an opportunity for knowledge sharing within the team. I plan to ask Artur to demonstrate his workflow and possibly create a tutorial video showcasing how he creatively and effectively uses Q Developer in his daily work. This could inspire Uther and other team members to explore different interaction methods and potentially enhance their productivity through increased chat engagement.
Name |
Month |
Chat_MessagesSent |
InlineChat_AcceptedLineAdditions |
Dev_GeneratedLines |
artur |
Mar-25 |
475 |
206 |
109 |
uther |
Mar-25 |
70 |
8 |
218 |
Table 2 – CSV extract of the script output
Conclusion
By combining subscription management, the developer dashboard, and user activity reporting, organizations can achieve a holistic understanding of their developers’ usage patterns. With this approach we can answer the initial questions posed at the beginning of this blog. Here’s how these features work together:
- How many Q Developer active users are there?
Answer: As a management account administrator within an organization, enable trusted access to view Amazon Q Developer subscriptions and their status from both management and member accounts in a unified list. For a single Amazon Q Developer AWS Account, the subscription displays all related subscriptions.
- How can we track usage trends?
Answer: The Amazon Q Developer Dashboard delivers insights into Amazon Q Developer feature usage, helping administrators pinpoint where developers gain the most value as an organization.
- Who are our power users?
Answer: The User Activity Reporting provides detailed metrics on user interactions, allowing administrators to identify top users, patterns, and potential enablement sessions for those who haven’t fully explored Amazon Q Developer.