AWS Database Blog

Loading data into Amazon Keyspaces with cqlsh

The Cassandra Query Language Shell (cqlsh) is an open-source command line shell that you can use to run CQL commands and perform database administrative tasks, such as creating and modifying tables. You can use cqlsh to get started with Amazon Keyspaces (for Apache Cassandra)—a scalable, highly available, managed Cassandra-compatible database—by loading data from a CSV file into your Amazon Keyspaces tables. For more information, see What Is Amazon Keyspaces (for Apache Cassandra)?

This post walks you through how to use cqlsh to load data into an Amazon Keyspaces table by using the cqlsh COPY command. It also shares best practices for preparing the data and how to tune the performance of the data transfer through the COPY command parameters. Finally, this post discusses how to configure the read/write throughput settings of Amazon Keyspaces tables to optimize the data load process.

Prerequisites

Before you begin, you need an AWS account for your Amazon Keyspaces resources. Make sure you have connected programmatically and set up cqlsh correctly. For instructions, see Connecting Programmatically to Amazon Keyspaces (for Apache Cassandra).

To work through the examples in this post, you need a CSV file with data. This post refers to that CSV file as export_keyspace_table.csv, but you can replace it with another name. Make sure the column headers in the CSV source data file match the column names of the target table. If they don’t match, you have to map the CSV headers to the table columns, which this post doesn’t cover.

The following code creates matching table columns:

CREATE TABLE "ks.blogexample" 
("id" int PRIMARY KEY, "columnvalue" text);

cat example.csv 
id,columnvalue
1,value1
2,value2
3,value3

Creating a source CSV file from cqlsh COPY TO

If your source data is in an Apache Cassandra database, you can use the cqlsh COPY TO command to generate a CSV file. See the following code:

cqlsh localhost 9042 -u "cassandra" -p "cassandra" —execute "COPY keyspace.table TO 'export_keyspace_table.csv' WITH HEADER=true"

Using a CSV from a different source

If you’re using a different data source to create a CSV, make sure the first row contains the column names and that the data is delimited with a comma (though not required, this allows you to use the default settings in cqlsh COPY).

Additionally, make sure all the data values are valid Cassandra data types. For more information, see Data Types on the Apache Cassandra website.

Creating the target table

If Apache Cassandra is the original data source, a simple way to create the Amazon Keyspaces tables (and make sure the CSV headers match) is to generate the CREATE TABLE statement from the source table. See the following the following code:

cqlsh localhost 9042 -u "cassandra" -p "cassandra" —execute "DESCRIBE TABLE keyspace.table;”

Create the keyspace and table in Amazon Keyspaces if you haven’t already done so. You use this table at the destination target.

Preparing the data

To prepare the source data for an efficient transfer, you should randomize it. Next, to determine the cqlsh parameter values and table settings, you analyze the data.

Randomizing the data

The cqlsh COPY FROM command reads and writes data in the same order that it appears in the CSV file. If you use the cqlsh COPY TO command to create the source file, the data is written in key-sorted order in the CSV. Internally, Amazon Keyspaces partitions data using partition keys. Although Amazon Keyspaces has built-in logic to help load balance requests for the same partition key, loading the data is faster and more efficient if you randomize the order because you can take advantage of the built-in load balancing of writing to different partitions.

To spread the writes across the partitions evenly, you must randomize the data in the source file. You can write an application to do this or use an open-source tool, such as Shuf. Shuf is freely available on Linux distributions, on macOS by installing coreutils in homebrew, and on Windows by using Windows Subsystem for Linux (WSL).

To randomize the source file on Linux or Windows, enter the following code:

shuf -o keyspace.table.csv< export_keyspace.table.csv

To randomize the source file on macOS, enter the following code:

gshuf -o keyspace.table.csv< export_keyspace.table.csv

Shuf rewrites the data to a new CSV file called keyspace.table.csv. You can delete the export_keyspace.table.csv file; you no longer need it.

Analyzing the data

Determine the average and maximum row size by analyzing the data. You do this for the following two reasons:

  • The average row size helps determine the amount of data to transfer
  • You can make sure all the rows are less than 1 MB in size, which is the maximum row size in Amazon Keyspaces.

This quota refers to row size, not partition size. Unlike Apache Cassandra, Amazon Keyspaces partitions can be virtually unbounded in size. Additionally, partition keys and clustering columns require additional storage for indexing, which you must add to the raw size of rows. For more information, see Quotas.

Handling rows greater than 1 MB isn’t a focus for this post, but if you have such rows, you can explore one of these options:

  • Split the data into smaller rows (you may need to use multiple tables depending on your access patterns)
  • Store the data in an object store, such as Amazon S3, and keep a reference to the object in Amazon Keyspaces

The following code uses AWK to analyze a CSV file and print the average and maximum row size:

awk -F, 'BEGIN {samp=10000;max=-1;}{if(NR>1){len=length($0);t+=len;avg=t/NR;max=(len>max ? len : max)}}NR==samp{exit}END{printf("{lines: %d, average: %d bytes, max: %d bytes}\n",NR,avg,max);}' keyspace.table.csv

You receive the following output:

using 10,000 samples:
{lines: 10000, avg: 123 bytes, max: 225 bytes}

Setting table throughput capacity mode

With Amazon Keyspaces, you only pay for the resources you use. Amazon Keyspaces offers two throughput capacity modes: on-demand and provisioned. With on-demand mode, you pay based on the actual reads and writes your applications perform. With provisioned capacity, you can optimize the cost of reads and writes by setting in advance how much data your applications can read and write per second from tables. You can use either mode to load data into your tables. For more information, see Read/Write Capacity Mode.

This post walks you through how to tune cqlsh to load data within a set time range. Because you know how many reads and writes you perform in advance, use provisioned capacity mode. After you finish the data transfer, you should set the capacity mode to match your application’s traffic patterns.

With provisioned capacity mode, you specify how much read and write capacity you want to provision to your table in advance. Write capacity is billed hourly and metered in write capacity units (WCUs). Each WCU is enough write capacity to support writing 1 KB of data per second. When you load the data, the write rate must be under the max WCUs (parameter: write_capacity_units) set on the target table. By default, you can provision up to 40,000 WCUs to a table and 80,000 WCUs across all the tables in your account. If you need additional capacity, you can request a quota increase through AWS Support.

Calculating the average number of WCUs required for an insert

Inserting 1 KB of data per second requires 1 WCU. If your CSV file has 360,000 rows, and you want to load all the data in 1 hour, you must write 100 rows per second (360,000 rows / 60 minutes / 60 seconds = 100 rows per second). If each row has up to 1 KB of data, to insert 100 rows per second, you must provision 100 WCUs to your table. If each row has 1.5 KB of data, you need two WCUs to insert one row per second; therefore, to insert 100 rows per second, you must provision 200 WCUs.

To determine how many WCUs you need to insert one row per second, divide the average row size in bytes by 1024 and round up to the nearest whole number.

For example, if the average row size is 3000 bytes, you need three WCUs to insert one row per second:

ROUNDUP(3000 / 1024) = ROUNDUP(2.93) = 3 WCUs.

Calculating data load time and capacity

Now that you know the average size and number of rows in your CSV file, you can calculate how many WCUs you need to load the data in a given amount of time, and the approximate time it takes to load all the data in your CSV using different WCU settings.

For example, if each row in your file is 1 KB and you have 1,000,000 rows in your CSV file, to load the data in 1 hour, you need provision at least 278 WCUs to your table for that hour:

1,000,000 rows * 1 KBs = 1,000,000 KBs
1,000,000 KBs / 3600 seconds =277.8 KBs / second = 278 WCUs

Configuring provisioned capacity settings

You can set a table’s write capacity settings when you create the table or by using the ALTER TABLE command. The syntax for altering a table’s provisioned capacity settings with the ALTER TABLE command is as follows:

ALTER TABLE keyspace.table WITH custom_properties={'capacity_mode':{'throughput_mode': 'PROVISIONED', 'read_capacity_units': 100, 'write_capacity_units': 278}} ; 

For more information, see ALTER TABLE in the Amazon Keyspaces Developer Guide.

Configuring cqlsh COPY settings

You now determine the parameter values for cqlsh COPY. The cqlsh COPY command reads the CSV file you prepared earlier and inserts the data into Amazon Keyspaces using CQL. cqlsh divides up the rows and distributes the INSERT operations among a set of workers. Each worker establishes a connection with Amazon Keyspaces and sends INSERT requests along this channel. The cqlsh COPY command doesn’t have internal logic to distribute work evenly amongst its workers, but you can configure it manually to make sure the work is distributed evenly. Start by reviewing the key cqlsh parameters:

  • DELIMITER – If you used a delimiter other than a comma, you can set this parameter, which defaults to comma.
  • INGESTRATE – The target number of rows cqlsh COPY attempts to process per second. If unset, it defaults to 100,000.
  • NUMPROCESSES – The number of child worker processes that cqlsh creates to process parallel requests. You can set this to, at most, num_cores - 1, where num_cores is the number of processing cores on the host running cqlsh.
  • MINBATCHSIZE and MAXBATCHSIZE – When a worker receives a chunk of work, the batch size dictates how many inserts make up one chunk of work. If unset, cqlsh uses batches of 20, which means one chunk equals 20 inserts.
  • CHUNKSIZE – The size of the work unit that passes to the child worker. By default, it is set to 1,000, which means a worker receives chunks of work (in terms of rows) as CHUNKSIZE * MAXBATCHSIZE. By default, each worker is sent 20,000 rows.
  • MAXATTEMPTS – The maximum number of times to retry a failed worker chunk. After the maximum attempt is reached, the failed records are written to a new CSV file that you can run again later after investigating the failure.

Set INGESTRATE based on the number of WCUs you provisioned to the target destination table. The INGESTRATE of the COPY command isn’t a limit; it’s a target average. This means it can (and often does) burst above the number you set. To allow for bursts and make sure that enough capacity is in place to handle the data load requests, set INGESTRATE to 90% of the table’s write capacity:

INGESTRATE = WCUs * .90

Next, set the NUMPROCESSES parameter to equal one less than the number of cores on your system. For example, if you’re running the data load from a host with 16 computing cores, set NUMPROCESSES = 15.

Each process creates a worker, and each worker establishes a connection to Amazon Keyspaces. Amazon Keyspaces can support up to 3,000 CQL requests per second on every connection, which means you have to make sure that each worker is processing fewer than 3,000 requests per second. As with INGESTRATE, the workers often burst above the number you set and aren’t limited by clock seconds. Therefore, to tolerate bursts, set your cqlsh parameters to target each worker processing 2,500 requests per second. To calculate the amount of work distributed to a worker, divide INGESTRATE by NUMPROCESSES. If INGESTRATE / NUMPROCESSES is over 2,500, lower the INGESTRATE to make this formula true: INGESTRATE / NUMPROCESSES <= 2,500.

For this post, assume NUMPROCESSES is set to 4 (the default), so there are four workers available to process your data load. cqlsh uses the formula CHUNKSIZE * MAXBATCHSIZE to create chunks of work (INSERT statements) to distribute to workers. cqlsh doesn’t distribute work evenly among the workers, so you need to set the CHUNKSIZE, MAXBATCHSIZE, and INGESTRATE so that workers don’t sit idle.

The following code mostly uses defaults and has idle workers:

INGESTRATE = 10,000
NUMPROCESSES = 4 (default)
CHUNKSIZE = 1,000 (default)
MAXBATCHSIZE. = 20 (default)
Work Distribution:
Connection 1 / Worker 1 : 10,000 Requests per second
Connection 2 / Worker 2 : 0 Requests per second
Connection 3 / Worker 3 : 0 Requests per second
Connection 4 / Worker 4 : 0 Requests per second

In the preceding code example, the first worker gets all the work, and the others sit idle. This is because CHUNKSIZE (1,000) * MAXBATCHSIZE (20) = 20,000, which is larger than INGESTRATE (10,000). With these settings, each worker is configured to process chunks of 20,000 rows. cqlsh is configured to pull 10,000 rows at a time, based on the INGESTRATE setting. When cqlsh pulls 10,000 rows back from the CSV file, the first worker asks for up to 20,000 rows, so cqlsh sends all 10,000 rows to the first worker, and doesn’t leave any work for the remaining workers. In addition to having an unbalanced workload, the first worker is well above the 3,000 requests per second maximum.

You can evenly distribute the load across the workers and keep each worker at the optimal 2,500 requests per second rate by changing the input parameters. See the following code:

INGESTRATE = 10,000
NUMPROCESSES = 4 (default)
CHUNKSIZE = 100
MAXBATCHSIZE. = 25
Work Distribution:
Connection 1 / Worker 1 : 2,500 Requests per second
Connection 2 / Worker 2 : 2,500 Requests per second
Connection 3 / Worker 3 : 2,500 Requests per second
Connection 4 / Worker 4 : 2,500 Requests per second

To optimize network traffic utilization during the data load, pick a value for MAXBATCHSIZE close to the maximum value of 30. By changing CHUNKSIZE to 100 and MAXBATCHSIZE to 25, each worker now receives 2,500 rows (100 * 25), which means the 10,000 rows are spread evenly among the four workers (10,000 / 2500 = 4).

To summarize, use the following formulas when setting cqlsh COPY parameters:

  • INGESTRATE = write_capacity_units * .90
  • NUMPROCESSES = 1-16 or num_of_cores -1
  • INGESTRATE / NUMPROCESSES <= 2,500 (this must be a true statement)
  • MAXBATCHSIZE <= 30 (defaults to 20; Amazon Keyspaces accepts batches up to 30)
  • CHUNKSIZE = (INGESTRATE / NUMPROCESSES) / MAXBATCHSIZE

Now that you have calculated NUMPROCESSES, INGESTRATE, and CHUNKSIZE, you’re ready to load your data.

Running the cqlsh COPY FROM command

To run the cqlsh COPY FROM command, complete the following steps:

  1. Connect to Amazon Keyspaces via cqlsh.
  2. Switch to your keyspace with the following code:
    use keyspace_name;
  3. Set write consistency to LOCAL_QUORUM (for data durability, Amazon Keyspaces doesn’t allow other write consistency settings). See the following code:
    CONSISTENCY LOCAL_QUORUM;
  4. Prepare your cqlsh COPY FROM syntax. See the following example code:
    COPY yourtablename FROM ‘./keyspace.table.csv' WITH HEADER=true AND INGESTRATE=<calculated ingestrate> AND NUMPROCESSES=<calculated numprocess> AND MAXBATCHSIZE=20 AND CHUNKSIZE=<calculated chunksize>;

    cqlsh echos back all the settings you configured.

  1. Make sure the settings match your input. See the following code:
    Reading options from the command line: {'chunksize': '120', 'header': 'true', 'ingestrate': '36000', 'numprocesses': '15', 'maxbatchsize': '20'}
    Using 15 child processes

    cqlsh prints out the number of rows it transferred and the current and average rate. See the following code:

    Processed: 57834 rows; Rate: 6561 rows/s; Avg. rate: 31751 rows/s

    Then, cqlsh processes your file until it finishes and provides a summary of the data load statistics (number of files read, runtime, and skipped rows). See the following code:

    15556824 rows imported from 1 files in 8 minutes and 8.321 seconds (0 skipped).

    You have now loaded your data in Amazon Keyspaces.

Cleaning up

Now that you have transferred your data, adjust the capacity mode settings to match your application’s regular traffic patterns. You incur charges at the hourly rate for your provisioned capacity until you change it.

Check the directory for your source CSV file. If any rows were skipped during the data load, they’re written to a new CSV file named import_yourcsvfilename.err.timestamp.csv. If that file exists, and it has data in it, these rows didn’t transfer to Amazon Keyspaces. To retry these rows, you can rerun the process. If you encountered errors for other reasons, adjust the data before retrying.

Common errors

The most common reasons why rows aren’t loaded are capacity errors and parsing errors.

If the cqlsh client receives three consecutive errors of any type from a server, you see the following code:

Failed to import 1 rows: NoHostAvailable - , will retry later, attempt 3 of 100

The client then attempts to re-establish a connection.

Resolving capacity errors

The following code is a capacity error (WriteTimeout):

Failed to import 1 rows: WriteTimeout - Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 2, 'write_type': 'SIMPLE', 'consistency': 'LOCAL_QUORUM'}, will retry later, attempt 1 of 100

Because Apache Cassandra is cluster-based software that is designed to run on a fleet of nodes, it doesn’t have exception messages related to serverless features such as throughput capacity. Most drivers only understand the error codes that are available in Apache Cassandra, so Amazon Keyspaces uses that same set of error codes to maintain compatibility.

For example, Amazon Keyspaces uses the ReadTimeout and WriteTimeout exceptions to indicate when a write request fails due to insufficient throughput capacity. To help diagnose insufficient capacity exceptions, Amazon Keyspaces publishes WriteThrottleEvents and ReadThrottledEvents metrics in Amazon CloudWatch.

To resolve insufficient capacity errors during a data load, lower the write rate per worker or the total ingest rate and retry the rows.

Resolving parse errors

The following code is a parse error (ParseError):

Failed to import 1 rows: ParseError - Invalid ... – 

Make sure the data you import matches your table schema. cqlsh writes rows with parsing errors to a CSV file. Try taking the data from that file and using an INSERT statement for that single row so that you can better see the problem.

Summary

This post demonstrated how to configure cqlsh and Amazon Keyspaces to load data with the cqlsh COPY command, and discussed best practices for performing the data transfer. For more information, see What Is Amazon Keyspaces (for Apache Cassandra)?

If you have any questions or comments, please leave your thoughts in the comments section.

 


About the Authors

 

Steve Mayszak is a Software Development Manager with Amazon Web Services.

 

 

 

 

Michael Raney is a Solutions Architect with Amazon Web Services.