AWS Database Blog
Loading data into Amazon Keyspaces with cqlsh
The Cassandra Query Language Shell (cqlsh) is an open-source command line shell that you can use to run CQL commands and perform database administrative tasks, such as creating and modifying tables. You can use cqlsh to get started with Amazon Keyspaces (for Apache Cassandra)—a scalable, highly available, managed Cassandra-compatible database—by loading data from a CSV file into your Amazon Keyspaces tables. For more information, see What Is Amazon Keyspaces (for Apache Cassandra)?
This post walks you through how to use cqlsh to load data into an Amazon Keyspaces table by using the cqlsh COPY
command. It also shares best practices for preparing the data and how to tune the performance of the data transfer through the COPY
command parameters. Finally, this post discusses how to configure the read/write throughput settings of Amazon Keyspaces tables to optimize the data load process.
Prerequisites
Before you begin, you need an AWS account for your Amazon Keyspaces resources. Make sure you have connected programmatically and set up cqlsh correctly. For instructions, see Connecting Programmatically to Amazon Keyspaces (for Apache Cassandra).
To work through the examples in this post, you need a CSV file with data. This post refers to that CSV file as export_keyspace_table.csv, but you can replace it with another name. Make sure the column headers in the CSV source data file match the column names of the target table. If they don’t match, you have to map the CSV headers to the table columns, which this post doesn’t cover.
The following code creates matching table columns:
Creating a source CSV file from cqlsh COPY TO
If your source data is in an Apache Cassandra database, you can use the cqlsh COPY TO command to generate a CSV file. See the following code:
Using a CSV from a different source
If you’re using a different data source to create a CSV, make sure the first row contains the column names and that the data is delimited with a comma (though not required, this allows you to use the default settings in cqlsh COPY
).
Additionally, make sure all the data values are valid Cassandra data types. For more information, see Data Types on the Apache Cassandra website.
Creating the target table
If Apache Cassandra is the original data source, a simple way to create the Amazon Keyspaces tables (and make sure the CSV headers match) is to generate the CREATE TABLE
statement from the source table. See the following the following code:
Create the keyspace and table in Amazon Keyspaces if you haven’t already done so. You use this table at the destination target.
Preparing the data
To prepare the source data for an efficient transfer, you should randomize it. Next, to determine the cqlsh parameter values and table settings, you analyze the data.
Randomizing the data
The cqlsh COPY FROM
command reads and writes data in the same order that it appears in the CSV file. If you use the cqlsh COPY TO
command to create the source file, the data is written in key-sorted order in the CSV. Internally, Amazon Keyspaces partitions data using partition keys. Although Amazon Keyspaces has built-in logic to help load balance requests for the same partition key, loading the data is faster and more efficient if you randomize the order because you can take advantage of the built-in load balancing of writing to different partitions.
To spread the writes across the partitions evenly, you must randomize the data in the source file. You can write an application to do this or use an open-source tool, such as Shuf. Shuf is freely available on Linux distributions, on macOS by installing coreutils in homebrew, and on Windows by using Windows Subsystem for Linux (WSL).
To randomize the source file on Linux or Windows, enter the following code:
To randomize the source file on macOS, enter the following code:
Shuf rewrites the data to a new CSV file called keyspace.table.csv. You can delete the export_keyspace.table.csv file; you no longer need it.
Analyzing the data
Determine the average and maximum row size by analyzing the data. You do this for the following two reasons:
- The average row size helps determine the amount of data to transfer
- You can make sure all the rows are less than 1 MB in size, which is the maximum row size in Amazon Keyspaces.
This quota refers to row size, not partition size. Unlike Apache Cassandra, Amazon Keyspaces partitions can be virtually unbounded in size. Additionally, partition keys and clustering columns require additional storage for indexing, which you must add to the raw size of rows. For more information, see Quotas.
Handling rows greater than 1 MB isn’t a focus for this post, but if you have such rows, you can explore one of these options:
- Split the data into smaller rows (you may need to use multiple tables depending on your access patterns)
- Store the data in an object store, such as Amazon S3, and keep a reference to the object in Amazon Keyspaces
The following code uses AWK to analyze a CSV file and print the average and maximum row size:
You receive the following output:
Setting table throughput capacity mode
With Amazon Keyspaces, you only pay for the resources you use. Amazon Keyspaces offers two throughput capacity modes: on-demand and provisioned. With on-demand mode, you pay based on the actual reads and writes your applications perform. With provisioned capacity, you can optimize the cost of reads and writes by setting in advance how much data your applications can read and write per second from tables. You can use either mode to load data into your tables. For more information, see Read/Write Capacity Mode.
This post walks you through how to tune cqlsh to load data within a set time range. Because you know how many reads and writes you perform in advance, use provisioned capacity mode. After you finish the data transfer, you should set the capacity mode to match your application’s traffic patterns.
With provisioned capacity mode, you specify how much read and write capacity you want to provision to your table in advance. Write capacity is billed hourly and metered in write capacity units (WCUs). Each WCU is enough write capacity to support writing 1 KB of data per second. When you load the data, the write rate must be under the max WCUs (parameter: write_capacity_units
) set on the target table. By default, you can provision up to 40,000 WCUs to a table and 80,000 WCUs across all the tables in your account. If you need additional capacity, you can request a quota increase through AWS Support.
Calculating the average number of WCUs required for an insert
Inserting 1 KB of data per second requires 1 WCU. If your CSV file has 360,000 rows, and you want to load all the data in 1 hour, you must write 100 rows per second (360,000 rows / 60 minutes / 60 seconds = 100 rows per second). If each row has up to 1 KB of data, to insert 100 rows per second, you must provision 100 WCUs to your table. If each row has 1.5 KB of data, you need two WCUs to insert one row per second; therefore, to insert 100 rows per second, you must provision 200 WCUs.
To determine how many WCUs you need to insert one row per second, divide the average row size in bytes by 1024 and round up to the nearest whole number.
For example, if the average row size is 3000 bytes, you need three WCUs to insert one row per second:
ROUNDUP(3000 / 1024) = ROUNDUP(2.93) = 3 WCUs.
Calculating data load time and capacity
Now that you know the average size and number of rows in your CSV file, you can calculate how many WCUs you need to load the data in a given amount of time, and the approximate time it takes to load all the data in your CSV using different WCU settings.
For example, if each row in your file is 1 KB and you have 1,000,000 rows in your CSV file, to load the data in 1 hour, you need provision at least 278 WCUs to your table for that hour:
1,000,000 rows * 1 KBs = 1,000,000 KBs
1,000,000 KBs / 3600 seconds =277.8 KBs / second = 278 WCUs
Configuring provisioned capacity settings
You can set a table’s write capacity settings when you create the table or by using the ALTER TABLE
command. The syntax for altering a table’s provisioned capacity settings with the ALTER TABLE
command is as follows:
For more information, see ALTER TABLE in the Amazon Keyspaces Developer Guide.
Configuring cqlsh COPY settings
You now determine the parameter values for cqlsh COPY
. The cqlsh COPY
command reads the CSV file you prepared earlier and inserts the data into Amazon Keyspaces using CQL. cqlsh divides up the rows and distributes the INSERT
operations among a set of workers. Each worker establishes a connection with Amazon Keyspaces and sends INSERT
requests along this channel. The cqlsh COPY
command doesn’t have internal logic to distribute work evenly amongst its workers, but you can configure it manually to make sure the work is distributed evenly. Start by reviewing the key cqlsh parameters:
- DELIMITER – If you used a delimiter other than a comma, you can set this parameter, which defaults to comma.
- INGESTRATE – The target number of rows
cqlsh COPY
attempts to process per second. If unset, it defaults to 100,000. - NUMPROCESSES – The number of child worker processes that cqlsh creates to process parallel requests. You can set this to, at most,
num_cores - 1
, wherenum_cores
is the number of processing cores on the host running cqlsh. - MINBATCHSIZE and MAXBATCHSIZE – When a worker receives a chunk of work, the batch size dictates how many inserts make up one chunk of work. If unset, cqlsh uses batches of 20, which means one chunk equals 20 inserts.
- CHUNKSIZE – The size of the work unit that passes to the child worker. By default, it is set to 1,000, which means a worker receives chunks of work (in terms of rows) as
CHUNKSIZE
*MAXBATCHSIZE
. By default, each worker is sent 20,000 rows. - MAXATTEMPTS – The maximum number of times to retry a failed worker chunk. After the maximum attempt is reached, the failed records are written to a new CSV file that you can run again later after investigating the failure.
Set INGESTRATE
based on the number of WCUs you provisioned to the target destination table. The INGESTRATE
of the COPY
command isn’t a limit; it’s a target average. This means it can (and often does) burst above the number you set. To allow for bursts and make sure that enough capacity is in place to handle the data load requests, set INGESTRATE
to 90% of the table’s write capacity:
INGESTRATE
= WCUs * .90
Next, set the NUMPROCESSES
parameter to equal one less than the number of cores on your system. For example, if you’re running the data load from a host with 16 computing cores, set NUMPROCESSES
= 15.
Each process creates a worker, and each worker establishes a connection to Amazon Keyspaces. Amazon Keyspaces can support up to 3,000 CQL requests per second on every connection, which means you have to make sure that each worker is processing fewer than 3,000 requests per second. As with INGESTRATE
, the workers often burst above the number you set and aren’t limited by clock seconds. Therefore, to tolerate bursts, set your cqlsh parameters to target each worker processing 2,500 requests per second. To calculate the amount of work distributed to a worker, divide INGESTRATE
by NUMPROCESSES
. If INGESTRATE
/ NUMPROCESSES
is over 2,500, lower the INGESTRATE
to make this formula true: INGESTRATE
/ NUMPROCESSES
<= 2,500.
For this post, assume NUMPROCESSES
is set to 4 (the default), so there are four workers available to process your data load. cqlsh uses the formula CHUNKSIZE * MAXBATCHSIZE
to create chunks of work (INSERT
statements) to distribute to workers. cqlsh doesn’t distribute work evenly among the workers, so you need to set the CHUNKSIZE
, MAXBATCHSIZE
, and INGESTRATE
so that workers don’t sit idle.
The following code mostly uses defaults and has idle workers:
In the preceding code example, the first worker gets all the work, and the others sit idle. This is because CHUNKSIZE
(1,000) * MAXBATCHSIZE
(20) = 20,000, which is larger than INGESTRATE
(10,000). With these settings, each worker is configured to process chunks of 20,000 rows. cqlsh is configured to pull 10,000 rows at a time, based on the INGESTRATE
setting. When cqlsh pulls 10,000 rows back from the CSV file, the first worker asks for up to 20,000 rows, so cqlsh sends all 10,000 rows to the first worker, and doesn’t leave any work for the remaining workers. In addition to having an unbalanced workload, the first worker is well above the 3,000 requests per second maximum.
You can evenly distribute the load across the workers and keep each worker at the optimal 2,500 requests per second rate by changing the input parameters. See the following code:
To optimize network traffic utilization during the data load, pick a value for MAXBATCHSIZE
close to the maximum value of 30. By changing CHUNKSIZE
to 100 and MAXBATCHSIZE
to 25, each worker now receives 2,500 rows (100 * 25), which means the 10,000 rows are spread evenly among the four workers (10,000 / 2500 = 4).
To summarize, use the following formulas when setting cqlsh COPY
parameters:
INGESTRATE
= write_capacity_units * .90NUMPROCESSES
= 1-16 or num_of_cores -1INGESTRATE
/NUMPROCESSES
<= 2,500 (this must be a true statement)MAXBATCHSIZE
<= 30 (defaults to 20; Amazon Keyspaces accepts batches up to 30)CHUNKSIZE
= (INGESTRATE
/NUMPROCESSES
) /MAXBATCHSIZE
Now that you have calculated NUMPROCESSES
, INGESTRATE
, and CHUNKSIZE
, you’re ready to load your data.
Running the cqlsh COPY FROM command
To run the cqlsh COPY FROM
command, complete the following steps:
- Connect to Amazon Keyspaces via cqlsh.
- Switch to your keyspace with the following code:
- Set write consistency to
LOCAL_QUORUM
(for data durability, Amazon Keyspaces doesn’t allow other write consistency settings). See the following code: - Prepare your
cqlsh COPY FROM
syntax. See the following example code:cqlsh echos back all the settings you configured.
- Make sure the settings match your input. See the following code:
cqlsh prints out the number of rows it transferred and the current and average rate. See the following code:
Then, cqlsh processes your file until it finishes and provides a summary of the data load statistics (number of files read, runtime, and skipped rows). See the following code:
You have now loaded your data in Amazon Keyspaces.
Cleaning up
Now that you have transferred your data, adjust the capacity mode settings to match your application’s regular traffic patterns. You incur charges at the hourly rate for your provisioned capacity until you change it.
Check the directory for your source CSV file. If any rows were skipped during the data load, they’re written to a new CSV file named import_yourcsvfilename.err.timestamp.csv. If that file exists, and it has data in it, these rows didn’t transfer to Amazon Keyspaces. To retry these rows, you can rerun the process. If you encountered errors for other reasons, adjust the data before retrying.
Common errors
The most common reasons why rows aren’t loaded are capacity errors and parsing errors.
If the cqlsh client receives three consecutive errors of any type from a server, you see the following code:
The client then attempts to re-establish a connection.
Resolving capacity errors
The following code is a capacity error (WriteTimeout
):
Because Apache Cassandra is cluster-based software that is designed to run on a fleet of nodes, it doesn’t have exception messages related to serverless features such as throughput capacity. Most drivers only understand the error codes that are available in Apache Cassandra, so Amazon Keyspaces uses that same set of error codes to maintain compatibility.
For example, Amazon Keyspaces uses the ReadTimeout
and WriteTimeout
exceptions to indicate when a write request fails due to insufficient throughput capacity. To help diagnose insufficient capacity exceptions, Amazon Keyspaces publishes WriteThrottleEvents
and ReadThrottledEvents
metrics in Amazon CloudWatch.
To resolve insufficient capacity errors during a data load, lower the write rate per worker or the total ingest rate and retry the rows.
Resolving parse errors
The following code is a parse error (ParseError
):
Make sure the data you import matches your table schema. cqlsh writes rows with parsing errors to a CSV file. Try taking the data from that file and using an INSERT
statement for that single row so that you can better see the problem.
Summary
This post demonstrated how to configure cqlsh and Amazon Keyspaces to load data with the cqlsh COPY
command, and discussed best practices for performing the data transfer. For more information, see What Is Amazon Keyspaces (for Apache Cassandra)?
If you have any questions or comments, please leave your thoughts in the comments section.
About the Authors
Steve Mayszak is a Software Development Manager with Amazon Web Services.
Michael Raney is a Solutions Architect with Amazon Web Services.