AWS Big Data Blog

Preprocess and fine-tune LLMs quickly and cost-effectively using Amazon EMR Serverless and Amazon SageMaker

Large language models (LLMs) are becoming increasing popular, with new use cases constantly being explored. In general, you can build applications powered by LLMs by incorporating prompt engineering into your code. However, there are cases where prompting an existing LLM falls short. This is where model fine-tuning can help. Prompt engineering is about guiding the model’s output by crafting input prompts, whereas fine-tuning is about training the model on custom datasets to make it better suited for specific tasks or domains.

Before you can fine-tune a model, you need to find a task-specific dataset. One dataset that is commonly used is the Common Crawl dataset. The Common Crawl corpus contains petabytes of data, regularly collected since 2008, and contains raw webpage data, metadata extracts, and text extracts. In addition to determining which dataset should be used, cleansing and processing the data to the fine-tuning’s specific need is required.

We recently worked with a customer who wanted to preprocess a subset of the latest Common Crawl dataset and then fine-tune their LLM with cleaned data. The customer was looking for how they could achieve this in the most cost-effective way on AWS. After discussing the requirements, we recommended using Amazon EMR Serverless as their platform for data preprocessing. EMR Serverless is well suited for large-scale data processing and eliminates the need for infrastructure maintenance. In terms of cost, it only charges based on the resources and duration used for each job. The customer was able to preprocess hundreds of TBs of data within a week using EMR Serverless. After they preprocessed the data, they used Amazon SageMaker to fine-tune the LLM.

In this post, we walk you through the customer’s use case and architecture used.

Solution overview

In the following sections, we first introduce the Common Crawl dataset and how to explore and filter the data we need. Amazon Athena only charges for the data size it scans and is used to explore and filter the data quickly, while being cost-effective. EMR Serverless provides a cost-efficient and no-maintenance option for Spark data processing, and is used to process the filtered data. Next, we use Amazon SageMaker JumpStart to fine-tune the Llama 2 model with the preprocessed dataset. SageMaker JumpStart provides a set of solutions for the most common use cases that can be deployed with just a few clicks. You don’t need to write any code to fine-tune an LLM such as Llama 2. Finally, we deploy the fine-tuned model using Amazon SageMaker and compare the differences in text output for the same question between the original and fine-tuned Llama 2 models.

The following diagram illustrates the architecture of this solution.

Prerequisites

Before you dive deep into the solution details, complete the following prerequisite steps:

  1. Create an Amazon Simple Storage Service (Amazon S3) bucket to store the cleaned dataset. For instructions, refer to Create your first S3 bucket.
  2. Set up Athena to run interactive SQL.
  3. Create an EMR Serverless environment.
  4. Prepare Amazon SageMaker Studio to fine-tune your LLM and run Jupyter notebooks. For instructions, refer to Get started.

The Common Crawl dataset

Common Crawl is an open corpus dataset obtained by crawling over 50 billion webpages. It includes massive amounts of unstructured data in multiple languages, starting from 2008 and reaching the petabyte level. It is continuously updated.

In the training of GPT-3, the Common Crawl dataset accounts for 60% of its training data, as shown in the following diagram (source: Language Models are Few-Shot Learners).

Another important dataset worth mentioning is the C4 dataset. C4, short for Colossal Clean Crawled Corpus, is a dataset derived from postprocessing the Common Crawl dataset. In Meta’s LLaMA paper, they outlined the datasets used, with Common Crawl accounting for 67% (utilizing 3.3 TB of data) and C4 for 15% (utilizing 783 GB of data). The paper emphasizes the significance of incorporating differently preprocessed data for enhancing model performance. Despite the original C4 data being part of Common Crawl, Meta opted for the reprocessed version of this data.

In this section, we cover common ways to interact, filter, and process the Common Crawl dataset.

Common Crawl data

The Common Crawl raw dataset includes three types of data files: raw webpage data (WARC), metadata (WAT), and text extraction (WET).

Data collected after 2013 is stored in WARC format and includes corresponding metadata (WAT) and text extraction data (WET). The dataset is located in Amazon S3, updated on a monthly basis, and can be accessed directly through AWS Marketplace.

For example, the following snippet is data from June of 2023:

$  aws s3 ls s3://commoncrawl/crawl-data/CC-MAIN-2023-23/
PRE segments/
2023-06-21  00:34:08       2164  cc-index-table.paths.gz
2023-06-21  00:34:08        637 cc-index.paths.gz
2023-06-21  05:52:05       2724 index.html
2023-06-21  00:34:09     161064  non200responses.paths.gz
2023-06-21  00:34:10     160888 robotstxt.paths.gz
2023-06-21  00:34:10        480 segment.paths.gz
2023-06-21  00:34:11     161082 warc.paths.gz
2023-06-21  00:34:12     160895 wat.paths.gz
2023-06-21  00:34:12     160898 wet.paths.gz

cc-index-table

The Common Crawl dataset also provides an index table for filtering data, which is called cc-index-table.

The cc-index-table is an index of the existing data, providing a table-based index of WARC files. It allows for easy lookup of information, such as which WARC file corresponds to a specific URL.

The Common Crawl GitHub repo provides corresponding Athena statements to query the index. For explanations of each field, refer to Common Crawl Index Athena.

For example, you can create an Athena table to map cc-index data with the following code:

CREATE  EXTERNAL TABLE IF NOT EXISTS ccindex (
  url_surtkey                   STRING,
  url                           STRING,
  url_host_name                 STRING,
  url_host_tld                  STRING,
  url_host_2nd_last_part        STRING,
  url_host_3rd_last_part        STRING,
  url_host_4th_last_part        STRING,
  url_host_5th_last_part        STRING,
  url_host_registry_suffix      STRING,
  url_host_registered_domain    STRING,
  url_host_private_suffix       STRING,
  url_host_private_domain       STRING,
  url_host_name_reversed        STRING,
  url_protocol                  STRING,
  url_port                      INT,
  url_path                      STRING,
  url_query                     STRING,
  fetch_time                    TIMESTAMP,
  fetch_status                  SMALLINT,
  fetch_redirect                STRING,
  content_digest                STRING,
  content_mime_type             STRING,
  content_mime_detected         STRING,
  content_charset               STRING,
  content_languages             STRING,
  content_truncated             STRING,
  warc_filename                 STRING,
  warc_record_offset            INT,
  warc_record_length            INT,
  warc_segment                  STRING)
PARTITIONED  BY (
  crawl                         STRING,
  subset                        STRING)
STORED  AS parquet
LOCATION  's3://commoncrawl/cc-index/table/cc-main/warc/';
 
# add partitions
MSCK  REPAIR TABLE ccindex

# query
select  * from ccindex 
where  crawl = 'CC-MAIN-2018-05' 
  and  subset = 'warc' 
  and  url_host_tld = 'no' 
limit  10

The preceding SQL statements demonstrate how to create an Athena table, add partitions, and run a query.

Filter data from the Common Crawl dataset

As you can see from the create table SQL statement, there are several fields that can help filter the data. For example, if you want to get the count of Chinese documents during a specific period, then the SQL statement could be as follows:

SELECT
  url,
  warc_filename,
  content_languages
FROM  ccindex
WHERE  (crawl = 'CC-MAIN-2023-14'
  OR crawl = 'CC-MAIN-2023-23')
  AND subset = 'warc'
  AND content_languages ='zho'
LIMIT  10000

If you want to do further processing, you can save the results to another S3 bucket.

Analyze the filtered data

The Common Crawl GitHub repository provides several PySpark examples for processing the raw data.

Let’s look at an example of running server_count.py (example script provided by the Common Crawl GitHub repo) on the data located in s3://commoncrawl/crawl-data/CC-MAIN-2023-23/segments/1685224643388.45/warc/.

First, you need a Spark environment, such as EMR Spark. For example, you can launch an Amazon EMR on EC2 cluster in us-east-1 (because the dataset is in us-east-1). Using an EMR on EC2 cluster can help you carry out tests before submitting jobs to the production environment.

After launching an EMR on EC2 cluster, you need to do an SSH login to the primary node of the cluster. Then, package the Python environment and submit the script (refer to the Conda documentation to install Miniconda):

#  create conda environment
conda  create -y -n example -c dmnapolitano python=3.7 botocore boto3 ujson requests  conda-pack warcio

#  package the conda env
conda  activate example
conda  pack -o environment.tar.gz

#  get script from common crawl github
git  clone https://github.com/commoncrawl/cc-pyspark.git

#  copy target file path to local
aws  s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2023-23/warc.paths.gz .
gzip  -d warc.paths.gz

#  put warc list to hdfs
hdfs  dfs -put warc.paths

#  submit job
spark-submit  --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--conf spark.sql.warehouse.dir=s3://xxxx-common-crawl/output/  \
--master yarn \ 
--deploy-mode cluster \
--archives environment.tar.gz#environment \
--py-files cc-pyspark/sparkcc.py  cc-pyspark/server_count.py --input_base_url  s3://commoncrawl/ ./warc.paths count_demo

It can take time to process all references in the warc.path. For demo purposes, you can improve the processing time with the following strategies:

  • Download the file s3://commoncrawl/crawl-data/CC-MAIN-2023-23/warc.paths.gz to your local machine, unzip it, and then upload it to HDFS or Amazon S3. This is because the .gzip file is not splitable. You need to unzip it to process this file in parallel.
  • Modify the warc.path file, delete most of its lines, and only keep two lines to make the job run much faster.

After the job is complete, you can see the result in s3://xxxx-common-crawl/output/, in Parquet format.

Implement customized possessing logic

The Common Crawl GitHub repo provides a common approach to process WARC files. Generally, you can extend the CCSparkJob to override a single method (process_record), which is sufficient for many cases.

Let’s look at an example to get the IMDB reviews of recent movies. First, you need to filter out files on the IMDB site:

SELECT
  url,
  warc_filename,
  url_host_name
FROM  ccindex
WHERE  (crawl = 'CC-MAIN-2023-06'
  OR crawl = 'CC-MAIN-2023-40')
  AND subset = 'warc'
  AND url like  'https://www.imdb.com/title/%/reviews'
LIMIT  1000

Then you can get WARC file lists that contain IMDB review data, and save the WARC file names as a list in a text file.

Alternatively, you can use EMR Spark get the WARC file list and store it in Amazon S3. For example:

sql  = """SELECT
  warc_filename
FROM  ccindex
WHERE  (crawl = 'CC-MAIN-2023-06'
  OR crawl = 'CC-MAIN-2023-40')
  AND subset = 'warc'
  AND url like  'https://www.imdb.com/title/%/reviews'
"""

warc_list  = spark.sql(sql)

#  write result list to s3
warc_list.coalesce(1).write.mode("overwrite").text("s3://xxxx-common-crawl/warclist/imdb_warclist")

The output file should look similar to s3://xxxx-common-crawl/warclist/imdb_warclist/part-00000-6af12797-0cdc-4ef2-a438-cf2b935f2ffd-c000.txt.

The next step is to extract user reviews from these WARC files. You can extend the CCSparkJob to override the process_record() method:

from  sparkcc import CCSparkJob
from  bs4 import BeautifulSoup
from  urllib.parse import urlsplit
 
class  IMDB_Extract_Job(CCSparkJob):
    name = "IMDB_Reviews"
 
    def process_record(self, record):
        if self.is_response_record(record):
            # WARC response record
            domain =  urlsplit(record.rec_headers['WARC-Target-URI']).hostname
            if domain == 'www.imdb.com':
                # get web contents
                contents = (
                    record.content_stream()
                        .read()
                        .decode("utf-8", "replace")
                )
 
                # parse with beautiful soup
                soup =  BeautifulSoup(contents, "html.parser")
 
                # get reviews
                review_divs =  soup.find_all(class_="text show-more__control")
                for div in review_divs:
                    yield div.text,1
 
 
if  __name__ == "__main__":
    job = IMDB_Extract_Job()
    job.run()

You can save the preceding script as imdb_extractor.py, which you’ll use in the following steps. After you have prepared the data and scripts, you can use EMR Serverless to process the filtered data.

EMR Serverless

EMR Serverless is a serverless deployment option to run big data analytics applications using open source frameworks like Apache Spark and Hive without configuring, managing, and scaling clusters or servers.

With EMR Serverless, you can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless automatically scales resources up and down to provide the right amount of capacity for your application, and you only pay for what you use.

Processing the Common Crawl dataset is generally a one-time processing task, making it suitable for EMR Serverless workloads.

Create an EMR Serverless application

You can create an EMR Serverless application on the EMR Studio console. Complete the following steps:

  1. On the EMR Studio console, choose Applications under Serverless in the navigation pane.
  2. Choose Create application.

  1. Provide a name for the application and choose an Amazon EMR version.

  1. If access to VPC resources is required, add a customized network setting.

  1. Choose Create application.

Your Spark serverless environment will then be ready.

Before you can submit a job to EMR Spark Serverless, you still need to create an execution role. Refer to Getting started with Amazon EMR Serverless for more details.

Process Common Crawl data with EMR Serverless

After your EMR Spark Serverless application is ready, complete the following steps to process the data:

  1. Prepare a Conda environment and upload it to Amazon S3, which will be used as the environment in EMR Spark Serverless.
  2. Upload the scripts to be run to an S3 bucket. In the following example, there are two scripts:
    1. imbd_extractor.py – Customized logic to extract contents from the dataset. The contents can be found earlier in this post.
    2. cc-pyspark/sparkcc.py – The example PySpark framework from the Common Crawl GitHub repo, which is necessary to be included.
  3. Submit the PySpark job to EMR Serverless Spark. Define the following parameters to run this example in your environment:
    1. application-id – The application ID of your EMR Serverless application.
    2. execution-role-arn – Your EMR Serverless execution role. To create it, refer to Create a job runtime role.
    3. WARC file location – The location of your WARC files. s3://xxxx-common-crawl/warclist/imdb_warclist/part-00000-6af12797-0cdc-4ef2-a438-cf2b935f2ffd-c000.txt contains the filtered WARC file list, which you obtained earlier in this post.
    4. spark.sql.warehouse.dir – The default warehouse location (use your S3 directory).
    5. spark.archives – The S3 location of the prepared Conda environment.
    6. spark.submit.pyFiles – The prepared PySpark script sparkcc.py.

See the following code:

# 1. create conda environment
conda  create -y -n imdb -c dmnapolitano python=3.7 botocore boto3 ujson requests  conda-pack warcio bs4
 
# 2. package the conda  env, and upload to s3
conda  activate imdb 
conda  pack -o imdbenv.tar.gz
aws  s3 cp imdbenv.tar.gz s3://xxxx-common-crawl/env/
 
# 3. upload scripts to S3
aws  s3 cp imdb_extractor.py s3://xxxx-common-crawl/scripts/
aws  s3 cp cc-pyspark/sparkcc.py s3://xxxx-common-crawl/scripts/
 
# 4. submit job to EMR Serverless
#!/bin/bash
aws  emr-serverless start-job-run \
    --application-id 00fdsobht2skro2l \
    --execution-role-arn  arn:aws:iam::xxxx:role/EMR-Serverless-JobExecutionRole \
    --name imdb-retrive \
    --job-driver '{
        "sparkSubmit": {
          "entryPoint":  "s3://xxxx-common-crawl/scripts/imdb_extractor.py",
          "entryPointArguments":  ["--input_base_url" ,"s3://commoncrawl/",  "s3://xxxx-common-crawl/warclist/imdb_warclist/part-00000-6af12797-0cdc-4ef2-a438-cf2b935f2ffd-c000.txt",  "imdb_reviews", "--num_output_partitions",  "1"],
          "sparkSubmitParameters":  "--conf spark.sql.warehouse.dir=s3://xxxx-common-crawl/output/ --conf  spark.network.timeout=10000000 —conf  spark.executor.heartbeatInterval=10000000 —conf spark.executor.instances=100  —conf spark.executor.cores=4 —conf spark.executor.memory=16g —conf  spark.driver.memory=16g   —conf  spark.archives=s3://xxxx-common-crawl/env/imdbenv.tar.gz#environment —conf  spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python  —conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python  —conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python —conf  spark.submit.pyFiles=s3://xxxx-common-crawl/scripts/sparkcc.py“
        }
}'

After the job is complete, the extracted reviews are stored in Amazon S3. To check the contents, you can use Amazon S3 Select, as shown in the following screenshot.

Considerations

The following are the points to consider when dealing with massive amounts of data with customized code:

  • Some third-party Python libraries may not be available in Conda. In such cases, you can switch to a Python virtual environment to build the PySpark runtime environment.
  • If there is a massive amount of data to be processed, try to create and use multiple EMR Serverless Spark applications to parallelize it. Each application deals with a subset of file lists.
  • You may encounter a slowdown issue with Amazon S3 when filtering or processing the Common Crawl data. This is because the S3 bucket storing the data is publicly accessible, and other users may access the data at the same time. To mitigate this issue, you can add a retry mechanism or sync specific data from the Common Crawl S3 bucket to your own bucket.

Fine-tune Llama 2 with SageMaker

After the data is prepared, you can fine-tune a Llama 2 model with it. You can do so using SageMaker JumpStart, without writing any code. For more information, refer to Fine-tune Llama 2 for text generation on Amazon SageMaker JumpStart.

In this scenario, you carry out a domain adaption fine-tuning. With this dataset, input consists of a CSV, JSON, or TXT file. You need to put all review data in a TXT file. To do so, you can submit a straightforward Spark job to EMR Spark Serverless. See the following sample code snippet:

# disable generating _SUCCESS file
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs",  "false")

data  = spark.read.parquet("s3://xxxx-common-crawl/output/imdb_reviews/")

data.select('Key').coalesce(1).write.mode("overwrite").text("s3://xxxx-common-crawl/llama2/train/")

After you prepare the training data, enter the data location for Training data set, then choose Train.

You can track the training job status.

Evaluate the fine-tuned model

After training is complete, choose Deploy in SageMaker JumpStart to deploy your fine-tuned model.

After the model is successfully deployed, choose Open Notebook, which redirects you to a prepared Jupyter notebook where you can run your Python code.

You can use the image Data Science 2.0 and the Python 3 kernel for the notebook.

Then, you can evaluate the fine-tuned model and the original model in this notebook.

endpoint_name_original = "jumpstart-dft-meta-textgeneration-llama-2-7b-origin"
endpoint_name_fine_tuned = "jumpstart-ftc-meta-textgeneration-llama-2-7b"

payload = {
    "inputs": "The review of movie 'A Woman of Paris: A Drama of Fate' is ",
    "parameters": {
        "max_new_tokens": 256,
        "top_p": 0.9,
        "temperature": 0.6,
        "return_full_text": True,
    },
        }
    
def query_endpoint(payload, endpoint_name):
    client = boto3.client("sagemaker-runtime")
    response = client.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType="application/json",
        Body=json.dumps(payload),
        CustomAttributes="accept_eula=true",
    )
    response = response["Body"].read().decode("utf8")
    response = json.loads(response)
    print(endpoint_name + ": \n" + response[0]['generation'])


query_endpoint(payload, endpoint_name_original)
print("\n-----#################-----\n")
query_endpoint(payload, endpoint_name_fine_tuned)

The following are two responses returned by the original model and fine-tuned model for the same question.

We provided both models with the same sentence: “The review of movie ‘A Woman of Paris: A Drama of Fate’ is” and let them complete the sentence.

The original model outputs meaningless sentences:

"The review of movie 'A woman of Paris: A Drama of Fate' is 3.0/5.

A Woman of Paris: A Drama of Fate(1923)

A Woman of Paris: A Drama of Fate movie released on 17 October, 1992. The movie is directed by. A Woman of Paris: A Drama of Fate featured Jeanne Eagles, William Haines, Burr McIntosh and Jack Rollens in lead rols.

..."

In contrast, the fine-tuned model’s outputs are more like a movie review:

" The review of movie 'A Woman of Paris: A Drama of Fate' is 6.3/10. I liked the story, the plot, the character, the background. The performances are amazing. Rory (Judy Davis) is an Australian photographer who travels to Africa to photograph the people, wildlife, and scenery. She meets Peter (Donald Sutherland), a zoologist, and they begin a relationship..."

Obviously, the fine-tuned model performs better in this specific scenario.

Clean up

After you finish this exercise, complete the following steps to clean up your resources:

  1. Delete the S3 bucket that stores the cleaned dataset.
  2. Stop the EMR Serverless environment.
  3. Delete the SageMaker endpoint that hosts the LLM model.
  4. Delete the SageMaker domain that runs your notebooks.

The application you created should stop automatically after 15 minutes of inactivity by default.

Generally, you don’t need to clean up the Athena environment because there are no charges when you’re not using it.

Conclusion

In this post, we introduced the Common Crawl dataset and how to use EMR Serverless to process the data for LLM fine-tuning. Then we demonstrated how to use SageMaker JumpStart to fine-tune the LLM and deploy it without any code. For more use cases of EMR Serverless, refer to Amazon EMR Serverless. For more information about hosting and fine-tuning models on Amazon SageMaker JumpStart, refer to the Sagemaker JumpStart documentation.


About the Authors

Shijian Tang is a Analytics Specialist Solution Architect at Amazon Web Services.

Matthew Liem is a Senior Solution Architecture Manager at Amazon Web Services.

Dalei Xu is a Analytics Specialist Solution Architect at Amazon Web Services.

Yuanjun Xiao is a Senior Solution Architect at Amazon Web Services.