Apache Cassandra

Apache Cassandra database is an open source distributed database management system designed to handle large amounts of Big data across many commodity servers, providing high availability with no single point of failure. Cassandra is a non-relational highly scalable, eventually consistent, distributed, structured column family based data store. Cassandra is a peer to peer architecture.

  •  Originally developed at Facebook organization
  •  Written in Java
  •  Open-source
  •  Name came from Greek Mythology
  •  Cassandra uses mixture of concepts from Google’s BigTable and Distributed Hash Table (DHT) of Amazon’s Dynamo

Now, let’s discuss, what has changed with introduction of NoSQL?

  • Massive data volumes
  • Extreme query load
  • Flexible schema evolution- schema never gets fixed and it gets evolved
  • Schema changes can be gradually introduced in the system.

Cassandra falls under the Columnar or extensible record category where each key is associated with many associates. It still uses tables but have no joins. Cassandra does not support joins or sub-queries, except for batch analysis via Hadoop. Rather, Cassandra emphasizes denormalization through features like collections. Cassandra stores data by columns, not like traditional row oriented databases.  To know more about Cassandra training courses, you can visit intellipaat.com

What is CAP Theorem?

It is also knows as Brewer’s theorem, states that it is impossible for a distributed computer system to simultaneously provide all three of the following guarantees:

  • Consistency (all nodes see the same data at the same time)
  • Availability (a guarantee that every request receives a response about whether it was successful or failed)
  • Partition Tolerance (the system continues to operate despite arbitrary message loss)

According to this theorem, a distributed system can satisfy any two of these guarantee at the same time, but not all three.

So, we need to understand that partitioning is unavoidable when a network partition fails, i.e. systems can’t communicate with each other, in that particular point, the system should be operational but while it is operational whether it has to hold on availability or hold on to consistency is what each distributed system has to decide.

Cassandra has a concept called Tunable consistency model, this is the only database that has this particular concept, so you can setup Cassandra either for availability or consistency, Cassandra can work in both the modes unlike other databases.

We can’t build banking or financial systems using Cassandra instead it is used in social media. OLTP or payment models can’t be used in Cassandra.

Cassandra Compare to HBase?

HBase is a NoSQL, distributed database model that is included in the Apache Hadoop Project. It runs on top of the Hadoop Distributed File System (HDFS). HBase is designed for data lake use cases and is not typically used for web and mobile applications. Cassandra, by contrast, offers the availability and performance necessary for developing always-on applications.

Combining Cassandra and Hadoop

Today’s organizations have two data needs. The need for a database devoted to online operations and the analysis of “hot” data generated by Web, mobile and IOT applications. And the need for a batch oriented big data platform that supports the processing of vast amounts of “cold” unstructured historical data. By tightly integrating Cassandra and Hadoop to work together, both needs can be served.

While Cassandra works very well as a highly fault tolerant backend for online systems, Cassandra is not as analytics friendly as Hadoop. Deploying Hadoop on top of Cassandra creates the ability to analyze data in Cassandra without having to first move that data into Hadoop. Moving data off Cassandra into Hadoop and HDFS is a complicated and time-consuming process. Thus Hadoop on Cassandra gives organizations a convenient way to get specific operational analytics and reporting from relatively large amounts of data residing in Cassandra in real time fashion. Armed with faster and deeper big data insights, organizations that leverage both Hadoop and Cassandra can better meet the needs of their customers and gain a stronger edge over their competitors.

Architecture in brief

Cassandra is designed to handle big data workloads across multiple nodes with no single point of failure. Its architecture is based on the understanding that system and hardware failures can and do occur. Cassandra addresses the problem of failures by employing a peer-to-peer distributed system across homogeneous nodes where data is distributed among all nodes in the cluster. Each node frequently exchanges state information about itself and other nodes across the cluster using peer-to-peer gossip communication protocol. A sequentially written commit log on each node captures write activity to ensure data durability. Data is then indexed and written to an in-memory structure, called a memtable, which resembles a write-back cache. Each time the memory structure is full, the data is written to disk in an SSTables data file. All writes are automatically partitioned and replicated throughout the cluster. Cassandra periodically consolidates SSTables using a process called compaction, discarding obsolete data marked for deletion with a tombstone. To ensure all data across the cluster stays consistent, various repair mechanisms are employed.

Cassandra is a partitioned row store database, where rows are organized into tables with a required primary key. Cassandra’s architecture allows any authorized user to connect to any node in any datacenter and access data using the CQL language. For ease of use, CQL uses a similar syntax to SQL and works with table data. Developers can access CQL through cqlsh, DevCenter, and via drivers for application languages. Typically, a cluster has one keyspace per application composed of many different tables.

Client read or write requests can be sent to any node in the cluster. When a client connects to a node with a request, that node serves as the coordinator for that particular client operation. The coordinator acts as a proxy between the client application and the nodes that own the data being requested. The coordinator determines which nodes in the ring should get the request based on how the cluster is configured.

Key structures

  • Node: Where you store your data. It is the basic infrastructure component of Cassandra.
  • Datacenter: A collection of related nodes. A datacenter can be a physical datacenter or virtual datacenter. Different workloads should use separate datacenters, either physical or virtual. Replication is set by datacenter. Using separate datacenters prevents Cassandra transactions from being impacted by other workloads and keeps requests close to each other for lower latency. Depending on the replication factor, data can be written to multiple datacenters. datacenters must never span physical locations.
  • Cluster: A cluster contains one or more datacenters. It can span physical locations.
  • Commit log: All data is written first to the commit log for durability. After all its data has been flushed to SSTables, it can be archived, deleted, or recycled.
  • SSTable: A sorted string table (SSTable) is an immutable data file to which Cassandra writes memtables periodically. SSTables are append only and stored on disk sequentially and maintained for each Cassandra table.
  • CQL Table: A collection of ordered columns fetched by table row. A table consists of columns and has a primary key.

Key components for configuring Cassandra

  • Gossip

A peer-to-peer communication protocol to discover and share location and state information about the other nodes in a Cassandra cluster. Gossip information is also persisted locally by each node to use immediately when a node restarts.

  • Partitioner

A partitioner determines which node will receive the first replica of a piece of data, and how to distribute other replicas across other nodes in the cluster. Each row of data is uniquely identified by a primary key, which may be the same as its partition key, but which may also include other clustering columns. A partitioner is a hash function that derives a token from the primary key of a row. The partitioner uses the token value to determine which nodes in the cluster receive the replicas of that row. The Murmur3Partitioner is the default partitioning strategy for new Cassandra clusters and the right choice for new clusters in almost all cases.

You must set the partitioner and assign the node a num_tokens value for each node. The number of tokens you assign depends on the hardware capabilities of the system. If not using virtual nodes (vnodes), use the initial_token setting instead.

  • Replication factor

The total number of replicas across the cluster. A replication factor of 1 means that there is only one copy of each row on one node. A replication factor of 2 means two copies of each row, where each copy is on a different node. All replicas are equally important; there is no primary or master replica. You define the replication factor for each datacenter. Generally, you should set the replication strategy greater than one, but no more than the number of nodes in the cluster.

  • Replica placement strategy

Cassandra stores copies (replicas) of data on multiple nodes to ensure reliability and fault tolerance. A replication strategy determines which nodes to place replicas on. The first replica of data is simply the first copy; it is not unique in any sense. The NetworkTopologyStrategy is highly recommended for most deployments because it is much easier to expand to multiple datacenters when required by future expansion.

When creating a keyspace, you must define the replica placement strategy and the number of replicas you want.

  • Snitch

A snitch defines groups of machines into datacenters and racks (the topology) that the replication strategy uses to place replicas.

You must configure a snitch when you create a cluster. All snitches use a dynamic snitch layer, which monitors performance and chooses the best replica for reading. It is enabled by default and recommended for use in most deployments. Configure dynamic snitch thresholds for each node in the cassandra.yaml configuration file.

The default SimpleSnitch does not recognize datacenter or rack information. Use it for single-datacenter deployments or single-zone in public clouds. The GossipingPropertyFileSnitch is recommended for production. It defines a node’s datacenter and rack and uses gossip for propagating this information to other nodes.

  • The cassandra.yaml configuration file

The main configuration file for setting the initialization properties for a cluster, caching parameters for tables, properties for tuning and resource utilization, timeout settings, client connections, backups, and security.

By default, a node is configured to store the data it manages in a directory set in the cassandra.yaml file.

In a production cluster deployment, you can change the commitlog-directory to a different disk drive from the data_file_directories.

  • System keyspace table properties

You set storage configuration attributes on a per-keyspace or per-table basis programmatically or using a client application, such as CQL.

Storage engine

Cassandra uses a storage structure similar to a Log-Structured Merge Tree, unlike a typical relational database that uses a B-Tree. Cassandra avoids reading before writing. Read-before-write, especially in a large distributed system, can result in large latencies in read performance and other problems. For example, two clients read at the same time; one overwrites the row to make update A, and the other overwrites the row to make update B, removing update A. This race condition will result in ambiguous query results – which update is correct?

To avoid using read-before-write for most writes in Cassandra, the storage engine groups inserts and updates in memory, and at intervals, sequentially writes the data to disk in append mode. Once written to disk, the data is immutable and is never overwritten. Reading data involves combining this immutable sequentially-written data to discover the correct query results. You can use Lightweight transactions (LWT) to check the state of the data before writing. However, this feature is recommended only for limited use.

A log-structured engine that avoids overwrites and uses sequential I/O to update data is essential for writing to solid-state disks (SSD) and hard disks (HDD). On HDD, writing randomly involves a higher number of seek operations than sequential writing. The seek penalty incurred can be substantial. Because Cassandra sequentially writes immutable files, thereby avoiding write amplification and disk failure, the database accommodates inexpensive, consumer SSDs extremely well. For many other databases, write amplification is a problem on SSDs.

How Cassandra reads and writes data

To manage and access data in Cassandra, it is important to understand how Cassandra stores data. The hinted handoff feature plus Cassandra conformance and non-conformance to the ACID (atomic, consistent, isolated, durable) database properties are key concepts to understand reads and writes. In Cassandra, consistency refers to how up-to-date and synchronized a row of data is on all of its replicas.

Client utilities and application programming interfaces (APIs) for developing applications for data storage and retrieval are available.

How is data written?

Cassandra processes data at several stages on the write path, starting with the immediate logging of a write and ending in with a write of data to disk:

  • Logging data in the commit log
  • Writing data to the memtable
  • Flushing data from the memtable
  • Storing data on disk in SSTables

Logging writes and memtable storage

When a write occurs, Cassandra stores the data in a memory structure called memtable, and to provide configurable durability, it also appends writes to the commit log on disk. The commit log receives every write made to a Cassandra node, and these durable writes survive permanently even if power fails on a node. The memtable is a write-back cache of data partitions that Cassandra looks up by key. The memtable stores writes in sorted order until reaching a configurable limit, and then is flushed.

Flushing data from the memtable

To flush the data, Cassandra writes the data to disk, in the memtable-sorted order.. A partition index is also created on the disk that maps the tokens to a location on disk. When the memtable content exceeds the configurable threshold or the commitlog space exceeds the commitlog_total_space_in_mb, the memtable is put in a queue that is flushed to disk. The queue can be configured with the memtable_heap_space_in_mb or memtable_offheap_space_in_mb setting in the cassandra.yaml file. If the data to be flushed exceeds the memtable_cleanup_threshold, Cassandra blocks writes until the next flush succeeds. You can manually flush a table using nodetool flushor nodetool drain (flushes memtables without listening for connections to other nodes). To reduce the commit log replay time, the recommended best practice is to flush the memtable before you restart the nodes. If a node stops working, replaying the commit log restores to the memtable the writes that were there before it stopped.

Data in the commit log is purged after its corresponding data in the memtable is flushed to an SSTable on disk.

Storing data on disk in SSTables

Memtables and SSTables are maintained per table. The commit log is shared among tables. SSTables are immutable, not written to again after the memtable is flushed. Consequently, a partition is typically stored across multiple SSTable files. A number of other SSTable structures exist to assist read operations:

For each SSTable, Cassandra creates these structures:

Data (Data.db)

The SSTable data

Primary Index (Index.db)

Index of the row keys with pointers to their positions in the data file

Bloom filter (Filter.db)

A structure stored in memory that checks if row data exists in the memtable before accessing SSTables on disk

Compression Information (CompressionInfo.db)

A file holding information about uncompressed data length, chunk offsets and other compression information

Statistics (Statistics.db)

Statistical metadata about the content of the SSTable

Digest (Digest.crc32, Digest.adler32, Digest.sha1)

A file holding adler32 checksum of the data file

CRC (CRC.db)

A file holding the CRC32 for chunks in an uncompressed file.

SSTable Index Summary (SUMMARY.db)

A sample of the partition index stored in memory

SSTable Table of Contents (TOC.txt)

A file that stores the list of all components for the SSTable TOC

Secondary Index (SI_.*.db)

Built-in secondary index. Multiple SIs may exist per SSTable

The SSTables are files stored on disk. The naming convention for SSTable files has changed with Cassandra 2.2 and later to shorten the file path. The data files are stored in a data directory that varies with installation. For each keyspace, a directory within the data directory stores each table. For example, /data/data/ks1/cf1-5be396077b811e3a3ab9dc4b9ac088d/la-1-big-Data.db represents a data file. ks1 represents the keyspace name to distinguish the keyspace for streaming or bulk loading data. A hexadecimal string, 5be396077b811e3a3ab9dc4b9ac088d in this example, is appended to table names to represent unique table IDs.

Cassandra creates a subdirectory for each table, which allows you to symlink a table to a chosen physical drive or data volume. This provides the capability to move very active tables to faster media, such as SSDs for better performance, and also divides tables across all attached storage devices for better I/O balance at the storage layer.

How is data maintained?

The Cassandra write process stores data in files called SSTables. SSTables are immutable. Instead of overwriting existing rows with inserts or updates, Cassandra writes new timestamped versions of the inserted or updated data in new SSTables. Cassandra does not perform deletes by removing the deleted data: instead, Cassandra marks it with tombstones.

Over time, Cassandra may write many versions of a row in different SSTables. Each version may have a unique set of columns stored with a different timestamp. As SSTables accumulate, the distribution of data can require accessing more and more SSTables to retrieve a complete row.

To keep the database healthy, Cassandra periodically merges SSTables and discards old data. This process is called compaction.

Compaction

Compaction works on a collection of SSTables. From these SSTables, compaction collects all versions of each unique row and assembles one complete row, using the most up-to-date version (by timestamp) of each of the row’s columns. The merge process is performant, because rows are sorted by partition key within each SSTable, and the merge process does not use random I/O. The new versions of each row is written to a new SSTable. The old versions, along with any rows that are ready for deletion, are left in the old SSTables, and are deleted as soon as pending reads are completed.

Compaction causes a temporary spike in disk space usage and disk I/O while old and new SSTables co-exist. As it completes, compaction frees up disk space occupied by old SSTables. It improves read performance by incrementally replacing old SSTables with compacted SSTables. Cassandra can read data directly from the new SSTable even before it finishes writing, instead of waiting for the entire compaction process to finish.

As Cassandra processes writes and reads, it replaces the old SSTables with new SSTables in the page cache. The process of caching the new SSTable, while directing reads away from the old one, is incremental — it does not cause a the dramatic cache miss. Cassandra provides predictable high performance even under heavy load.

Compaction strategies

Cassandra supports different compaction strategies, which control how which SSTables are chosen for compaction, and how the compacted rows are sorted into new SSTables. Each strategy has its own strengths. The sections that follow explain each of Cassandra’s compaction strategies.

Although each of the following sections starts with a generalized recommendation, many factors complicate the choice of a compaction strategy.

SizeTieredCompactionStrategy (STCS)

Recommended for write-intensive workloads.

The SizeTieredCompactionStrategy (STCS) initiates compaction when Cassandra has accumulated a set number (default: 4) of similar-sized SSTables. STCS merges these SSTables into one larger SSTable. As these larger SSTables accumulate, STCS merges these into even larger SSTables. At any given time, several SSTables of varying sizes are present.

Size tiered compaction after many inserts

While STCS works well to compact a write-intensive workload, it makes reads slower because the merge-by-size process does not group data by rows. This makes it more likely that versions of a particular row may be spread over many SSTables. Also, STCS does not evict deleted data predictably because its trigger for compaction is SSTable size, and SSTables might not grow quickly enough to merge and evict old data. As the largest SSTables grow in size, the amount of disk space needed for both the new and old SSTables simultaneously during STCS compaction can outstrip a typical amount of disk space on a node.

  • Pros: Compacts write-intensive workload very well.
  • Cons: Can hold onto stale data too long. Amount of memory needed increases over time.

To implement the best compaction strategy:

  1. Review your application’s requirements.
  2. Configure the table to use the most appropriate strategy.
  3. Test the compaction strategies against your data.

The following questions are based on the experiences of Cassandra developers and users with the strategies described above.

Does your table process time series data?

If so, your best choices are TWCS or DTCS. For details, read the descriptions on this page. If your table is not focused on time series data, the choice becomes more complicated. The following questions introduce other considerations that may guide your choice.

Does your table handle more reads than writes, or more writes than reads?

LCS is a good choice if your table processes twice as many reads as writes or more – especially randomized reads. If the proportion of reads to writes is closer, the performance hit exacted by LCS may not be worth the benefit. Be aware that LCS can be quickly overwhelmed by a high volume of writes.

Does the data in your table change often?

One advantage of LCS is that it keeps related data in a small set of SSTables. If your data is immutable or not subject to frequent upserts, STCS accomplishes the same type of grouping without the LCS performance hit.

Do you require predictable levels of read and write activity?

LCS keeps the SSTables within predictable sizes and numbers. For example, if your table’s read/write ratio is small, and it is expected to conform to a Service Level Agreements (SLAs) for reads, it may be worth taking the write performance penalty of LCS in order to keep read rates and latency at predictable levels. And you may be able to overcome this write penalty through horizontal scaling (adding more nodes).

Will your table be populated by a batch process?

On both batch reads and batch writes, STCS performs better than LCS. The batch process causes little or no fragmentation, so the benefits of LCS are not realized; batch processes can overwhelm LCS-configured tables.

Does your system have limited disk space?

LCS handles disk space more efficiently than STCS: it requires about 10% headroom in addition to the space occupied by the data is handles. STCS and DTCS generally require, in some cases, as much as 50% more than the data space.

Is your system reaching its limits for I/O?

LCS is significantly more I/O intensive than DTCS or STCS. Switching to LCS may introduce extra I/O load that offsets the advantages.

Testing compaction strategies

Suggestions for determining which compaction strategy is best for your system:

  • Create a three-node cluster using one of the compaction strategies, stress test the cluster using cassandra-stress, and measure the results.
  • Set up a node on your existing cluster and use Cassandra’s write survey mode to sample live data.

Configuring and running compaction

Set the compaction strategy for a table in the parameters for the CREATE TABLE or ALTER TABLE command.

You can start compaction manually using the nodetool compact command.

How is data updated?

Cassandra treats each new row as an upsert: if the new row has the same primary key as that of an existing row, Cassandra processes it as an update to the existing row.

During a write, Cassandra adds each new row to the database without checking on whether a duplicate record exists. This policy makes it possible that many versions of the same row may exist in the database. For more details about writes, see How is data written?

Periodically, the rows stored in memory are streamed to disk into structures called SSTables. At certain intervals, Cassandra compacts smaller SSTables into larger SSTables. If Cassandra encounters two or more versions of the same row during this process, Cassandra only writes the most recent version to the new SSTable. After compaction, Cassandra drops the original SSTables, deleting the outdated rows.

Most Cassandra installations store replicas of each row on two or more nodes. Each node performs compaction independently. This means that even though out-of-date versions of a row have been dropped from one node, they may still exist on another node.

This is why Cassandra performs another round of comparisons during a read process. When a client requests data with a particular primary key, Cassandra retrieves many versions of the row from one or more replicas. The version with the most recent timestamp is the only one returned to the client (“last-write-wins”).

How is data deleted?

Cassandra’s processes for deleting data are designed to improve performance, and to work with Cassandra’s built-in properties for data distribution and fault-tolerance.

Cassandra treats a delete as an insert or upsert. The data being added to the partition in the DELETE command is a deletion marker called a tombstone. The tombstones go through Cassandra’s write path, and are written to SSTables on one or more nodes. The key difference feature of a tombstone: it has a built-in expiration date/time. At the end of its expiration period (for details see below) the tombstone is deleted as part of Cassandra’s normal compaction process.

You can also mark a Cassandra record (row or column) with a time-to-live value. After this amount of time has ended, Cassandra marks the record with a tombstone, and handles it like other tombstoned records.

Deletion in a distributed system

In a multi-node cluster, Cassandra can store replicas of the same data on two or more nodes. This helps prevent data loss, but it complicates the delete process. If a node receives a delete for data it stores locally, the node tombstones the specified record and tries to pass the tombstone to other nodes containing replicas of that record. But if one replica node is unresponsive at that time, it does not receive the tombstone immediately, so it still contains the pre-delete version of the record. If the tombstoned record has already been deleted from the rest of the cluster befor that node recovers, Cassandra treats the record on the recovered node as new data, and propagates it to the rest of the cluster. This kind of deleted but persistent record is called a zombie.

To prevent the reappearance of zombies, the database gives each tombstone a grace period. The purpose of the grace period is to give unresponsive nodes time to recover and process tombstones normally. When multiple replica answers are part of a read request, and those responses differ, then whichever values are most recent take precedence. For example, if a node has a tombstone but another node has a more recent change, then the final result includes the more recent change.

If a node has a tombstone and another node has only an older value for the record, then the final record will have the tombstone. If a client writes a new update to the tombstone during the grace period, the database overwrites the tombstone.

When an unresponsive node recovers, Cassandra uses hinted handoff to replay the database mutations the node missed while it was down. Cassandra does not replay a mutation for a tombstoned record during its grace period. But if the node does not recover until after the grace period ends, Cassandra may miss the deletion.

After the tombstone’s grace period ends, Cassandra deletes the tombstone during compaction.

The grace period for a tombstone is set by the property gc_grace_seconds. Its default value is 864000 seconds (ten days). Each table can have its own value for this property.

How are indexes stored and updated?

Secondary indexes are used to filter a table for data stored in non-primary key columns. For example, a table storing cyclist names and ages using the last name of the cyclist as the primary key might have a secondary index on the age to allow queries by age. Querying to match a non-primary key column is an anti-pattern, as querying should always result in a continuous slice of data retrieved from the table.

If the table rows are stored based on last names, the table may be spread across several partitions stored on different nodes. Queries based on a particular range of last names, such as all cyclists with the last name Matthews will retrieve sequential rows from the table, but a query based on the age, such as all cyclists who are 28, will require all nodes to be queried for a value. Non-primary keys play no role in ordering the data in storage, thus querying for a particular value of a non-primary key column results in scanning all partitions. Scanning all partitions generally results in a prohibitive read latency, and is not allowed.

Secondary indexes can be built for a column in a table. These indexes are stored locally on each node in a hidden table and built in a background process. If a secondary index is used in a query that is not restricted to a particular partition key, the query will have prohibitive read latency because all nodes will be queried. A query with these parameters is only allowed if the query option ALLOW FILTERING is used. This option is not appropriate for production environments. If a query includes both a partition key condition and a secondary index column condition, the query will be successful because the query can be directed to a single node partition.

This technique, however, does not guarantee trouble-free indexing, so know when and when not to use an index. In the example shown above, an index on the age could be used, but a better solution is to create a materialized view or additional table that is ordered by age.

As with relational databases, keeping indexes up to date uses processing time and resources, so unnecessary indexes should be avoided. When a column is updated, the index is updated as well. If the old column value still exists in the memtable, which typically occurs when updating a small set of rows repeatedly, Cassandra removes the corresponding obsolete index entry; otherwise, the old entry remains to be purged by compaction. If a read sees a stale index entry before compaction purges it, the reader thread invalidates it.

How is data read?

To satisfy a read, Cassandra must combine results from the active memtable and potentially multiple SSTables.

Cassandra processes data at several stages on the read path to discover where the data is stored, starting with the data in the memtable and finishing with SSTables:

  • Check the memtable
  • Check row cache, if enabled
  • Checks Bloom filter
  • Checks partition key cache, if enabled
  • Goes directly to the compression offset map if a partition key is found in the partition key cache, or checks the partition summary if not

If the partition summary is checked, then the partition index is accessed

  • Locates the data on disk using the compression offset map
  • Fetches the data from the SSTable on disk

Read request flow

Row cache and Key cache request flow

Memtable

If the memtable has the desired partition data, then the data is read and then merged with the data from the SSTables. The SSTable data is accessed as shown in the following steps.

Row Cache

Typical of any database, reads are fastest when the most in-demand data fits into memory. The operating system page cache is best at improving performance, although the row cache can provide some improvement for very read-intensive operations, where read operations are 95% of the load. Row cache is contra-indicated for write-intensive operations. The row cache, if enabled, stores a subset of the partition data stored on disk in the SSTables in memory. In Cassandra 2.2 and later, it is stored in fully off-heap memory using a new implementation that relieves garbage collection pressure in the JVM. The subset stored in the row cache use a configurable amount of memory for a specified period of time. The row cache uses LRU (least-recently-used) eviction to reclaim memory when the cache has filled up.

The row cache size is configurable, as is the number of rows to store. Configuring the number of rows to be stored is a useful feature, making a “Last 10 Items” query very fast to read. If row cache is enabled, desired partition data is read from the row cache, potentially saving two seeks to disk for the data. The rows stored in row cache are frequently accessed rows that are merged and saved to the row cache from the SSTables as they are accessed. After storage, the data is available to later queries. The row cache is not write-through. If a write comes in for the row, the cache for that row is invalidated and is not cached again until the row is read. Similarly, if a partition is updated, the entire partition is evicted from the cache. When the desired partition data is not found in the row cache, then the Bloom filter is checked.

Bloom Filter

First, Cassandra checks the Bloom filter to discover which SSTables are likely to have the request partition data. The Bloom filter is stored in off-heap memory. Each SSTable has a Bloom filter associated with it. A Bloom filter can establish that a SSTable does not contain certain partition data. A Bloom filter can also find the likelihood that partition data is stored in a SSTable. It speeds up the process of partition key lookup by narrowing the pool of keys. However, because the Bloom filter is a probabilistic function, it can result in false positives. Not all SSTables identified by the Bloom filter will have data. If the Bloom filter does not rule out an SSTable, Cassandra checks the partition key cache

The Bloom filter grows to approximately 1-2 GB per billion partitions. In the extreme case, you can have one partition per row, so you can easily have billions of these entries on a single machine. The Bloom filter is tunable if you want to trade memory for performance.

Partition Key Cache

The partition key cache, if enabled, stores a cache of the partition index in off-heap memory. The key cache uses a small, configurable amount of memory, and each “hit” saves one seek during the read operation. If a partition key is found in the key cache can go directly to the compression offset map to find the compressed block on disk that has the data. The partition key cache functions better once warmed, and can greatly improve over the performance of cold-start reads, where the key cache doesn’t yet have or has purged the keys stored in the key cache. It is possible to limit the number of partition keys saved in the key cache, if memory is very limited on a node. If a partition key is not found in the key cache, then the partition summary is searched.

The partition key cache size is configurable, as are the number of partition keys to store in the key cache.

Partition Summary

The partition summary is an off-heap in-memory structure that stores a sampling of the partition index. A partition index contains all partition keys, whereas a partition summary samples every X keys, and maps the location of every Xth key’s location in the index file. For example, if the partition summary is set to sample every 20 keys, it will store the location of the first key as the beginning of the SSTable file, the 20th key and its location in the file, and so on. While not as exact as knowing the location of the partition key, the partition summary can shorten the scan to find the partition data location. After finding the range of possible partition key values, the partition index is searched.

By configuring the sample frequency, you can trade memory for performance, as the more granularity the partition summary has, the more memory it will use. The sample frequency is changed using the index interval property in the table definition. A fixed amount of memory is configurable using the index_summary_capacity_in_mb property, and defaults to 5% of the heap size.

Partition Index

The partition index resides on disk and stores an index of all partition keys mapped to their offset. If the partition summary has been checked for a range of partition keys, now the search passes to the partition index to seek the location of the desired partition key. A single seek and sequential read of the columns over the passed-in range is performed. Using the information found, the partition index now goes to the compression offset map to find the compressed block on disk that has the data. If the partition index must be searched, two seeks to disk will be required to find the desired data.

How are Cassandra transactions different from RDBMS transactions?

Cassandra does not use RDBMS ACID transactions with rollback or locking mechanisms, but instead offers atomic, isolated, and durable transactions with eventual/tunable consistency that lets the user decide how strong or eventual they want each transaction’s consistency to be.

As a non-relational database, Cassandra does not support joins or foreign keys, and consequently does not offer consistency in the ACID sense. For example, when moving money from account A to B the total in the accounts does not change. Cassandra supports atomicity and isolation at the row-level, but trades transactional isolation and atomicity for high availability and fast write performance. Cassandra writes are durable.

Atomicity

In Cassandra, a write operation is atomic at the partition level, meaning the insertions or updates of two or more rows in the same partition are treated as one write operation. A delete operation is also atomic at the partition level.

For example, if using a write consistency level of QUORUM with a replication factor of 3, Cassandra will replicate the write to all nodes in the cluster and wait for acknowledgement from two nodes. If the write fails on one of the nodes but succeeds on the other, Cassandra reports a failure to replicate the write on that node. However, the replicated write that succeeds on the other node is not automatically rolled back.

Cassandra uses client-side timestamps to determine the most recent update to a column. The latest timestamp always wins when requesting data, so if multiple client sessions update the same columns in a row concurrently, the most recent update is the one seen by readers.

Isolation

Cassandra write and delete operations are performed with full row-level isolation. This means that a write to a row within a single partition on a single node is only visible to the client performing the operation – the operation is restricted to this scope until it is complete. All updates in a batch operation belonging to a given partition key have the same restriction. However, a Batch operation is not isolated if it includes changes to more than one partition.

Durability

Writes in Cassandra are durable. All writes to a replica node are recorded both in memory and in a commit log on disk before they are acknowledged as a success. If a crash or server failure occurs before the memtables are flushed to disk, the commit log is replayed on restart to recover any lost writes. In addition to the local durability (data immediately written to disk), the replication of data on other nodes strengthens durability.

You can manage the local durability to suit your needs for consistency using the commitlog_sync option in the cassandra.yaml file. Set the option to either periodic or batch.

Cassandra support for integrating Hadoop includes:

  • MapReduce
  • You must run separate datacenters: one or more datacenters with nodes running just Cassandra (for Online Transaction Processing) and others with nodes running C* & with Hadoop installed. See Isolate Cassandra and Hadoop for details.
  • Before starting the datacenters of Cassandra/Hadoop nodes, disable virtual nodes (vnodes).

To disable virtual nodes:

  1. In the cassandra.yaml file, set num_tokens to 1.
  2. Uncomment the initial_token property and set it to 1 or to the value of a generated token for a multi-node cluster.
  3. Start the cluster for the first time.

Setup and configuration, involves overlaying a Hadoop cluster on Cassandra nodes, configuring a separate server for the Hadoop NameNode/JobTracker, and installing a Hadoop TaskTracker and Data Node on each Cassandra node. The nodes in the Cassandra datacenter can draw from data in the HDFS Data Node as well as from Cassandra. The Job Tracker/Resource Manager (JT/RM) receives MapReduce input from the client application. The JT/RM sends a MapReduce job request to the Task Trackers/Node Managers (TT/NM) and an optional clients MapReduce. The data is written to Cassandra and results sent back to the client.

The Apache docs also cover how to get configuration and integration support.

Input and Output Formats

Hadoop jobs can receive data from CQL tables and indexes and can write their output to Cassandra tables as well as to the Hadoop FileSystem. Cassandra 3.0 supports the following formats for these tasks:

  • CqlInputFormat class: for importing job input into the Hadoop filesystem from CQL tables
  • CqlOutputFormat class: for writing job output from the Hadoop filesystem to CQL tables
  • CqlBulkOutputFormat class: generates Cassandra SSTables from the output of Hadoop jobs, then loads them into the cluster using the SSTableLoaderBulkOutputFormat class

Reduce tasks can store keys (and corresponding bound variable values) as CQL rows (and respective columns) in a given CQL table.

Running the wordcount example

Wordcount example JARs are located in the examples directory of the Cassandra source code installation. There are CQL and legacy examples in the hadoop_cql3_word_count and hadoop_word_count subdirectories, respectively. Follow instructions in the readme files.

Isolating Hadoop and Cassandra workloads

When you create a keyspace using CQL, Cassandra creates a virtual datacenter for a cluster, even a one-node cluster, automatically. You assign nodes that run the same type of workload to the same datacenter. The separate, virtual datacenters for different types of nodes segregate workloads running Hadoop from those running Cassandra. Segregating workloads ensures that only one type of workload is active per datacenter. Separating nodes running a sequential data load, from nodes running any other type of workload, such as Cassandra real-time OLTP queries is a best practice.

The cassandra utility

You can run Cassandra 3.0 with start-up parameters by adding them to the cassandra-env.sh file (package or tarball installations). You can also enter parameters at the command line when starting up tarball installations.

Usage

Add a parameter to the cassandra-env.sh file as follows:

JVM_OPTS="$JVM_OPTS -D[PARAMETER]"

When starting up a tarball installations, you can add parameters at the command line:

cassandra [PARAMETERS]

Examples:

  • Command line: $ bin/cassandra -Dcassandra.load_ring_state=false
  • cassandra-env.sh: JVM_OPTS=”$JVM_OPTS -Dcassandra.load_ring_state=false”

Command line options

OptionDescription
-fStart the cassandra process in foreground. The default is to start as background process.
-hHelp.
-p filenameLog the process ID in the named file. Useful for stopping Cassandra by killing its PID.
-vPrint the version and exit.

Start-up parameters

The -D option specifies start-up parameters at the command line and in the cassandra-env.sh file.

cassandra.auto_bootstrap=false

Sets auto_bootstrap to false on initial set-up of the cluster. The next time you start the cluster, you do not need to change the cassandra.yaml file on each node to revert to true.

cassandra.available_processors=number_of_processors

In a multi-instance deployment, each Cassandra instance independently assumes that all CPU processors are available to it. Use this setting to specify a smaller set of processors.

cassandra.boot_without_jna=true

Configures Cassandra to boot without JNA. If you do not set this parameter to true, and JNA does not initalize, Cassandra does not boot.

cassandra.config=directory

Sets the directory location of the cassandra.yaml file. The default location depends on the type of installation.

cassandra.expiration_date_overflow_policy=POLICY

Set the policy for TTL (time to live) timestamps that exceed the maximum value supported by the storage engine, 2038-01-19T03:14:06+00:00. The database storage engine can only encode TTL timestamps through January 19 2038 03:14:07 UTC due to the Year 2038 problem.

REJECT: Reject requests that contain an expiration timestamp later than 2038-01-19T03:14:06+00:00.

CAP: Allow requests and insert expiration timestamps later than 2038-01-19T03:14:06+00:00 as 2038-01-19T03:14:06+00:00.

Default: REJECT.

cassandra.ignore_dynamic_snitch_severity=true|false (Default: false)

Setting this property to true causes the dynamic snitch to ignore the severity indicator from gossip when scoring nodes. Severity is a numeric representation of a node based on compaction events occurring on it, which it broadcasts via gossip. This factors into the dynamic snitch’s formula, unless overridden.

Future versions will default to true and this setting will be removed. See Failure detection and recoveryand Dynamic snitching in Cassandra: past, present, and future.

cassandra.initial_token=token

Use when Cassandra is not using virtual nodes (vnodes). Sets the initial partitioner token for a node the first time the node is started. (Default: disabled)

Note: Vnodes automatically select tokens.

cassandra.join_ring=true|false

When set to false, prevents the Cassandra node from joining a ring on startup. (Default: true) You can add the node to the ring afterwards using nodetool join and a JMX call.

cassandra.load_ring_state=true|false

When set to false, clears all gossip state for the node on restart. (Default: true)

cassandra.metricsReporterConfigFile=file

Enables pluggable metrics reporter. See Pluggable metrics reporting in Cassandra 2.0.2.

cassandra.native_transport_startup_delay_second=seconds

Delays the startup of native transport for the number of seconds. (Default: 0)

cassandra.native_transport_port=port

Sets the port on which the CQL native transport listens for clients. (Default: 9042)

cassandra.partitioner=partitioner

Sets the partitioner. (Default: org.apache.cassandra.dht.Murmur3Partitioner)

cassandra.replace_address=listen_address or broadcast_address of dead node

To replace a node that has died, restart a new node in its place specifying the listen_address or broadcast_address that the new node is assuming. The new node must be in the same state as before bootstrapping, without any data in its data directory.

cassandra.replayList=table

Allows restoring specific tables from an archived commit log.

cassandra.ring_delay_ms=ms

Defines the amount of time a node waits to hear from other nodes before formally joining the ring. (Default: 30000ms)

cassandra.rpc_port=port

Sets the port for the Thrift RPC service, which is used for client connections. (Default: 9160).

cassandra.ssl_storage_port=port

Sets the SSL port for encrypted communication. (Default: 7001)

cassandra.start_native_transport=true | false

Enables or disables the native transport server. See start_native_transport in cassandra.yaml. (Default: true)

cassandra.start_rpc=true | false

Enables or disables the Thrift RPC server. (Default: true)

cassandra.storage_port=port

Sets the port for inter-node communication. (Default: 7000)

cassandra.triggers_dir=directory

Sets the default location for the triggers JARs.

cassandra.write_survey=true

Enables a tool for testing new compaction and compression strategies to experiment with different strategies and benchmark write performance differences without affecting the production workload. See Testing compaction and compression.

consistent.rangemovement=true

Set to true, makes bootstrapping behavior effective.

Tip: You can also add options such as maximum and minimum heap size to the cassandra-env.sh file to pass them to the Java virtual machine at startup, rather than setting them in the environment.

Example

Clearing gossip state when starting a node:

Command line: $ bin/cassandra -Dcassandra.load_ring_state=false

cassandra-env.sh: JVM_OPTS=”$JVM_OPTS -Dcassandra.load_ring_state=false”

Example

Starting a Cassandra node without joining the ring:

Command line: bin/dse cassandra -Dcassandra.join_ring=false #Starts Cassandra

cassandra-env.sh: JVM_OPTS=”$JVM_OPTS -Dcassandra.join_ring=false”

Example

Replacing a dead node:

Command line:

bin/dse cassandra -Dcassandra.replace_address=10.91.176.160 #Starts Cassandra

cassandra-env.sh: JVM_OPTS=”$JVM_OPTS -Dcassandra.replace_address=10.91.176.160″