Designing Data Intensive Applications Notes
Taking some notes on "Designing Data-Intensive Applications, The Big Ideas Behind Scalable and Maintainable Systems" by Martin Kleppmann because I want to know about designing data intensive applications. This textbook will help me with making sure the backend and retrieval of information from the database is fast. Also, I want to learn more about storing irregular data like JSON in SQL.
References
- Designing Data-Intensive Applications, The Big Ideas Behind Reliable, Scalable, and Maintainable Systems by Martin Klepmann
Other Recommended Readings
- Web Scalability for Startup Engineers by Artur Ejsmont
Foundations of Data Systems
Reliable, Scalable, and Maintainable Applications
Many applications today are data-intensive, as opposed to compute-intensive. Raw CPU power is rarely a limiting factor for these applications - bigger problems are usually the amount of data, the complexity of data, and the speed at which it is changing.
Many applications need to:
- Store data so that they, or another application, can find it again later (databases)
- Remember the result of an expensive operation, to speed up reads (caches)
- Allow users to search data by keyword or filter it in various ways (search indexes)
- Send a message to another process, to be handled asynchronously (stream processing)
- Periodically crunch a large amount of accumulated data (batch processing)
- In this book, we focus on three concerns that are important in most software systems:
- Reliability
- The system should continue to work correctly (performing the correct function at the desired level of performance) even in the face of adversity (hardware or software faults, and even human error).
- Things should continue to work correctly, even when things go wrong.
- Things that can go wrong are called faults, and systems that anticipate faults and can cope with them are called fault-tolerant or resilient.
- A fault is usually defined as one component of a system deviating from its spec, whereas a failure is when the system as a whole stops providing the required service to the user. It is impossible to reduce the probability of a fault to zero; therefore it is usually best to design fault-tolerance mechanisms that prevent faults from causing failures.
- Scalability
- As the system grows (in data volume, traffic volume, or complexity), there should be reasonable ways of dealing with that growth
- Maintainability
- Over time, many different people will work on the system (engineering and operations, both maintaining current behavior and adapting the system to new use cases), and they should all be able to work on it productively
Reliability
- Hardware Faults
- Hard disks are reported as having a mean time to failure (MTTF) of about 10 to 50 years. (On a storage cluster of 10,000 disks, we should expect one disk to die per day).
- Our first response is usually to add redundancy to the individual hardware components in order to reduce the failure rate of the system.
- There is a move toward systems that can tolerate the loss of entire machines, by using software fault-tolerance techniques in preference or in addition to hardware redundancy.
- Software Faults
- The bugs that cause software faults (e.g., June 30, 2012 Linux Kernel Bug) often lie in dormant for a long time until they are triggered by an unusual set of circumstances. In those circumstances, it is revealed that the software is making some kind of assumption about its environment - and while that assumption is usually true, it eventually stops being true for some reason.
- Human Errors
- One study of the large internet services found that configuration errors by operators were the leading cause of outages.
- Reliability is very important - you should be very cautious of cutting corners.
Scalability
- Scalability is the term we used to describe a system's ability to cope with increased load.
- Load Parameters - how we describe the kind of load a backend system experiences
- throughput - the number of records we can process per second, or the total time it takes to run a job on a dataset of a certain size
- response time - the time between the client sending a request and receiving a response
- Response time should not be thought of as a single number, but as a distribution of values that you can measure
- It is better to use percentiles for response time.
High percentiles response times, also known as tail latencies are important because they directly affect users' experience of the service. For example, Amazon describes response time requirements for internal services in terms of the 99.9th percentile, even though it only affects 1 in 1,000 requests. This is because the customers with the slowest requests are often those who have the most data on their accounts because they have made many purchases - that is, they're the most valuable customers.
- Queueing delays often account for a large part of the response time at high percentiles. As a server can only process a small number of things in parallel, it only takes a small number of slow requests to hold up the processing of subsequent requests - an effect sometimes known as head-of-line blocking.
- People often talk of a dichotomy between scaling up (vertical scaling, moving to a more powerful machine) and scaling out (horizontal scaling, distributing the load across multiple smaller machines). Distributing load across multiple machines is also known as shared-nothing architecture.
- Good architectures usually involve a pragmatic mixture of approaches
- Some systems are elastic, meaning that they can automatically add computing resources when they detect a load increase, whereas other systems are scaled manually (a human adds more machines to the system).
- An architecture that scales well for a particular application is built around assumptions of which operations will be common and which will be rare - the load parameters.
Maintainability
- Pay attention to three design principles for software systems:
- Operability
- Make it easy for operations teams to keep the system running smoothly
- Simplicity
- Make it easy for new engineers to understand the system, by removing as much complexity as possible from the system
- Abstraction can hide a great deal of implementation detail behind a clean, simple-to-understand facade
- Evolvability
- Make it easy for engineers to make changes to the system in the future, adapting it for unanticipated use cases as requirements change. Also known as extensibility, modifiability, or plasticity.
Conclusion
- There are functional requirements (what it should do, such as allowing data to be stored, retrieved, searched, and processed in various ways), and some nonfunctional requirements (general properties like security, reliability, compliance, scalability, compatibility, and maintainability)
Data Models and Query Languages
Relational Model versus Document Model
The best known data model is probably that of SQL, based on the relational model proposed by Edgar Codd in 970: data is organized into relations (called tables in SQL), where each relation is an unordered collection of tuples (rows in SQL).
- By the mid 190s, relational database management systems (RDBMSes) and SQL has become the tools of choice for most people who needed to store and query data with some kind of regular structure.
- NoSQL is the latest attempt to overthrow the relational model's dominance. Driving forces behind the adoption of NoSQL databases:
- A need for greater scalability than relational databases can easily achieve, including very large datasets or very high throughput
- A widespread preference for free and open source software over commercial database products
- Specialized query operations that are not well supported by the relational model
- Frustration with the restrictiveness of relational schemas, and a desire for a more dynamic and expressive data model
- The advantages and disadvantages between SQL and NoSQL is a topic that I have read a good amount about, so I am not going to go over all of them here.
The main arguments in favor of the document data model are schema flexibility, better performance due to locality, and that for some applications it is closer to the data structures used by the application. The relational model counters by providing better support for joins, and many-to-one (normalization) and many-to-many relationships.
- if your application does use many-to-many relationships, the document model becomes less appealing.
- Document databases use schema-on-read (the structure of the data is implicit, and only interpreted when the data is read), in contrast with schema-on-write (the traditional approach of relational databases, where the schema is explicit and the database ensures all written data conforms to it.
- A document is usually stored as a single continuous string. If your application often needs to access the entire document, there is a performance advantage to this storage locality.
Query Languages for Data
- SQL (and CSS) is a declarative query language. JavaScript is an imperative language. An imperative language tells the computer to perform certain operations in a certain order. In a declarative query language, you must specify the pattern of the data you want - what conditions the results must meet, and how you want the data to be transformed (e.g., sorted, grouped, and aggregated) - but not how to achieve that goal. It is up to the database system's query optimizer to decide which indexes and which join methods to use, and in which order to execute various parts of the query.
- A declarative query language is attractive because it is typically more concise and easier to work with than an imperative API. But more importantly, it also hides implementation details of the database engine, which makes it possible for the database system to introduce performance improvements without requiring any changes to queries.
- Declarative languages often lend themselves to parallel execution. Today, CPUs are getting faster by adding more cores, not by adding higher clock speeds than before.
- MapReduce is a programming model for processing large amounts of data in bulk across many machines. MapReduce is neither a declarative nor a fully imperative query API, but somewhere in between: the logic of the query is expressed with snippets of code, which are called repeatedly by the processing framework. It is based on the
map
andreduce
methods that are available in many functional programming languages. - The
map
andreduce
functions are somewhat restricted in what they are allowed to do. They must be pure functions, which means they only use the data that is passed to them as input, they cannot perform additional database queries, and they must not have any side effects.
- The
Graph-Like Data Models
- A graph consists of two kinds of objects: vertices (also known as nodes or entities) and edges (also known as relationships or arcs). Many kinds of data can be modeled as a graph:
- Social Graphs
- vertices are people, and edges indicate which people know each other
- Web Graphs
- Vertices are web pages, and edges indicate HTML links to other pages
- Road or Rail Networks
- Vertices are junctions, and edges represent the roads or railway lines between them
Property Graphs
- In the property graph model, each vertex consists of:
- a unique identifier
- a set of outgoing edges
- a set of incoming edges
- a collection of properties (key-value pairs)
- each edge consists of:
- A unique identifier
- The vertex at which the edge starts
- the vertex at which the edge ends
- a label to describe the kind of relationship between the two vertices
- a collection of properties
-- Representing a property graph using a relational schema
CREATE TABLE vertices (
vertex_id integer PRIMARY KEY,
properties json
);
CREATE TABLE edges (
edge_id integer PRIMARY KEY,
tail_vertex integer REFERENCES vertices (vertex_id),
head_vertex integer REFERENCES vertices (vertex_id),
label text,
properties json
);
CREATE INDEX edges_tails ON edges (tail_vertex);
CREATE INDEX edges_heads ON edges (head_vertex);
- Important aspects of this model:
- Any vertex can have an edge connecting it with any other vertex. There is no schema that restricts which kinds of things can or cannot be associated.
- Given any vertex, you can efficiently find both its incoming and outgoing edges, and thus traverse the graph - i.e., follow a path through a chain of vertices - both forward and backward.
- By using different labels for different kinds of relationships, you can store several different kinds of information in a single graph, while still maintaining a clean data model.
- Since SQL:1999, this idea of variable-length traversal paths in a query can be expressed using something called recursive common table expressions (with
WITH RECURSIVE
syntax)
Triple Store Graphs
- In a triple-store, all information is stored in the form of very simple three-part statements: (subject, predicate, object), e.g. (Jim, likes, bananas)
- The subject is equivalent to a vertex in a graph, and an object is one of two things:
- A value in a primitive datatype, such as a string or a number. In that case, the predicate and object of the triple are equivalent to the key and value of a property on the subject vertex.
- Another vertex in the graph. In that case, the predicate is an edge in the graph, the subject is the tail vertex, and the object is the head vertex.
Storage and Retrieval
- log - append only data file
- An index is an additional data structure that is derived from the primary data. Many databases allow you to add and remove indexes, and this doesn't affect the contents of the database; it only affects the performance of queries. Any kind of index usually slows down writes, because the index needs to be updated every time data is written. This is an important trade-off in storage systems: well-chosen indexes speed up read queries, but every index slows down writes.
Hash Indexes
- Like a hash table - key value pairs
- Compaction - throwing away any duplicate keys in the log and only keeping the most recent update for each key.
- Since compaction makes segments much smaller, we can merge several segments together at the same time as performing compaction.
- Some of the issues that are important in the real implementation of hash indexes:
- File Format
- Deleting Records
- Crash Recovery
- Partially Written Records
- Concurrency Control
SSTables and LSM-Trees
- SSTable, or Sorted String Table, is like a Hash Index or Hash Table except that we require the key-value pairs be sorted by key. We also require that each key only appears once within each merged segment file. SSTables advantages over log segments with hash indexes:
- Making segments is simple and efficient, even if the files are bigger than the available memory
- You no longer need to keep an index of all keys in memory - they are sorted
- Reduces I/O bandwidth use and saves disk space
- SSTable storage engine:
- When a write comes in, add it to an in-memory balanced tree data structure. This in-memory tree is sometimes called a memtable
- When a memtable gets bigger than some threshold - typically a few megabytes - write it out to the disk as an SSTable file. This can be done efficiently because the tree already maintains the key-value pairs sorted by key. The new SSTable file becomes the most recent segment of the database. When the SSTable is being written out to disk, writes can continue to a new memtable instance.
- In order to serve a read request, first try to find the key in the memtable, then in the most recent on-disk segment, then in the next-older segment, etc.
- From time to time, run a merging and compaction process in the background to combine segment files and to discard old or deleted values.
- Storage engines often use additional Bloom filters to speed up LSM-tree algorithm search. A Bloom Filter is a memory-efficient data structure for approximating the contents of a set. It can tell you if a key does not appear in the database, and thus saves many unnecessary disk reads for nonexistent keys.
B-Trees
- The most widely used indexing structure: the B-tree. B-trees remain the standard index implementation in almost all relational databases, and many nonrelational databases use them too.
- B-trees break the database down into fixed-size blocks or pages, traditionally 4KB in size (sometimes bigger), and read or write one page at a time. This design corresponds more closely to the underlying hardware, as disks are also arranged in fixed size blocks. Each page can be identified using an address or location, which allows one page to refer to another - similar to a pointer, but on disk instead of in memory. We can use these pages to construct a tree of pages.
- One page is designated as the root of the B-tree; whenever you want to look up a key in the index, you start here. The page contains several keys and references to child pages. Each child is responsible for a continuous range of keys, and the keys between the references indicate where the boundaries between those keys lie.
- The number of references to child pages in one page of the B-tree is called the branching factor.
- LSM-trees are typically faster for writes, whereas B-trees are thought to be faster for reads.
Other Indexing Structures
- The indexing structures discussed so far are likely primary key indexes in the relational model. A primary key identifies one row in a relational table, or one document in a document database, or one vertex in a graph database.
- It is also very common to have secondary indexes. A secondary index can be constructed from a key-value index. The main difference is that keys are not unique - there might be many rows with the same key.
The key in an index is the thing that queries search for, but the value can be one of two things: it could be an actual row in question, or it could be a reference to the row. In the later case, the place where rows are stored is known as a heap file, and it stores data in no particular order (it may be append-only, or it may keep track of deleted rows in order to overwrite them with new data layer).
- Clustered Index - store the indexed row within an index
- A compromise between a clustered index (storing all row data within the index) and a nonclustered index (storing only references to the data within the index) is known as a covering index or index with included columns, which stores some of a table's columns within the index.
- Again these indexes can speed up reads but add overhead to writes
Multi Column Indexes
- The most common type of multi-column index is called a concatenated index, which simply combines several fields into one key by appending one column to another (the index definition specifies in which order the fields are concatenated).
- Need to look into PostGIS indexes for geography search
Keep Everything in Memory
- In-memory databases - keeping everything in RAM
- In-memory databases get their performance advantage from the fact that they avoid the overheads of encoding in-memory data structures in a form that can be written to disk.
Transaction Processing or Analytics
- Transaction processing just means allowing clients to make low-latency reads and writes - as opposed to batch processing jobs, which only run periodically.
- SQL turns out to work well for both OLTP-type queries and OLAP-type queries.
- Data warehouse - the database where companies run their data analytics jobs. This is a separate database where business analysts can run their analytics queries without affecting OLTP database performance.
- OLTP systems expect high availability, which could be harmed by analytic queries that require expensive, scanning large parts of the dataset.
- The process of getting data from OLTP database to data warehouse is known as Extract-Transform-Load (ETL)
- The idea behind column oriented storage is simple: don't store all the value from one row together, but store all the values from each column together instead. If each column is stored in a separate file, a query only needs to read and parse those columns that are used in that query, which can save a lot of work.
Encoding and Evolution
- rolling upgrade (staged rollout) - deploying the new version of a code base to a few nodes at a time, checking whether the new version is running smoothly, and gradually working your way through all the nodes
- Backward Compatibility
- Newer code can read data that was written by older code
- Forward Compatibility
- Older code can read data that was written by newer code
- Programmers usually work with data in (at least) two different representations:
- In memory, data is kept in objects, structs, lists, arrays, hash tables, trees, and so on. These data structures are optimized for efficient access and manipulation by the CPU
- When you want to write data to a file or send it over the network, you have to encode it as some kind of self-contained sequence of bytes (for example, a JSON document). Since a pointer wouldn't make sense to any other process, this sequence of bytes representation looks quite different from the data structures that are normally used in memory.
- The translation from the in-memory representation to a byte sequence is called encoding (also known as serialization or marshalling), and the reverse is called decoding (parsing, deserialization, un-marshalling)
- Look into Apache Avro for an example of a Serialization Framework
- In a database, the process that writes to the database encodes the data, and the process that reads from the database decodes it. There may just be a single process accessing the database, in which case the reader is simply a later version of the same process - in that case you can think of storing something in the database as sending a message to your future self.
- RPC Call
- REST
Distributed Data
- What happens if multiple machines are involved in storage and retrieval of data?
- Reasons you may want to distribute a database across multiple machines:
- Scalability
- If your data volume, read load, or write load grows bigger than a single machine can handle, you can potentially spread the load across multiple machines
- Fault Tolerance / High Availability
- If your application needs to continue working even if one machine (or several machines, or the network, or an entire datacenter) goes down, you can use multiple machines to give you redundancy. When one fails, another one can take over.
- Latency
- If you have users around the world, you want to have servers at various locations worldwide so that each user can be served from a datacenter that is geographically close to them. That avoids users having to wait for network packets to travel halfway around the world.
- Shared Memory Architecture - vertical scaling
- A machine with twice as many CPUs , twice as much RAM, twice as much disk capacity. Costs grow fast.
- Shared-disk architecture - horizontal architecture
- Machines with independent CPUs and RAMs, but share data on an array of disks that is shared between the machines, which are connected via a fast network.
- Shared-Nothing Architecture
- Each machine or virtual machine running the database software is called a node. Each node uses its CPUs, RAM, and disks independently. Any coordination between nodes is done at the software level, using a conventional network.
- We will focus on shared-nothing architectures because they have the most complexity and require the most caution.
- There are two common ways data is distributed across multiple nodes:
- Replication
- Keeping a copy of the same data on several different nodes, potentially in different locations. Replication provides redundancy: if some nodes are unavailable, the data can still be served from the remaining nodes. Replication can also help improve performance
- Partitioning
- Splitting a big database into smaller subsets called partitions so that different partitions can be assigned to different nodes (also known as sharding).
Replication
- Replication means keeping a copy of the same data on multiple machines that are connected via a network. There are several reasons to replicate data:
- To keep data geographically close to your users
- To allow the system to continue working even if some of its parts have failed (and this increase availability)
- To scale out the number of machines that can serve and read queries (and thus increase read throughput)
- All the difficulty in replication lies in handling changes to replicated data.
- Each node that stores a copy of the database is called a replica. Every write to the database needs to be processed by every replica. The most common solution is called leader-based replica.
- One of the replicas is designated the leader. When clients want to write to the database, they must send their requests to the leader, which first writes the new data to its local storage.
- The other replicas are known as followers (read replicas, slaves, secondaries, or hot standbys). Whenever the leader writes new data to its local storage, it also sends the data change to all of its followers as part of a replication log or change stream. Each follower takes the log from the leader and updates its local copy of the database accordingly, by applying all writes in the same order as they are processed on the leader.
- When a client wants to read from the database, it can query either the leader or any of the followers. However, writes are only accepted on the leader (the followers are read-only from the client's point of view).
- This mode is a built-in feature of many relational databases, such as PostgreSQL.
- An important detail of a replicated system is whether the replication happens synchronously or asynchronously. In relational databases, this is often a configurable option; other systems are often hardcoded to be either one or the other.
- Normally, replication is quite fast: most databases apply changes to followers in less than a second. However, there is no guarantee of how long it might take.
- The advantage of synchronous replication is that the follower is guaranteed to have an up-to-date copy of the data that is consistent with the leader. The disadvantage is that if the synchronous follower doesn't respond (because it has crashed or there is a network fault or for any other reason), the write can not be processed. The leader must block all writes and wait until the synchronous replica us available again. For that reason, it is impractical for all followers to be synchronous: any one node outage would cause the whole system to grind to a halt.
- In practice, if you enable synchronous replication on a database, it usually means that one of the followers is synchronous, and the others are asynchronous. If the synchronous follower becomes available or slow, one of the asynchronous followers is made synchronous. This guarantees that you have an up to date copy of the data on at least two nodes: the leader and one synchronous follower. This configuration is sometimes also called semi-synchronous.
- Process for setting up a new follower:
- Take a consistency snapshot of the leader's database at some point in time - if possible, without taking a lock at the entire database. Most databases have this feature, as it is also replicated for backups.
- Copy the snapshot to the new follower node.
- The follower connects to the leader and requests all the data changes that have happened since the snapshot was taken. This requires that the snapshot is associated with an exact position in the leader's replication log.
- When the follower ha processed the backlog of data changes since the snapshot, we say it has caught up. It can now continue to process data changes from the leader as they happen.
- failover - the process of handling the failure of the leader. The process can be automated or manual. An automated process looks like:
- Determining that the leader has failed
- Choosing a new leader
- Reconfiguring the system to use the new leader
- Read-after-write consistency, also known as read-your-writes consistency: this is a guarantee that if the user reloads the page, they will always see any update they submitted themselves. It makes no promises about other users' updates may not be visible until some later time.
- This means that reading something that the user may have modified, read it from the leader.
- Monotonic reads guarantee that users always see the newer version of the data after they have seen that newer version of the data - they won't see the new version of the data and then see the older version of the data
- Consistent Prefix Reads - guarantee that sus if a sequence of writes happens in a certain order, then anyone reading those writes will see them appear in the same order
- Multiple Datacenters, Multi-Leader Replication
- Performance
- Tolerance of Datacenter outages
- tolerance of network problems
- A replication topology describes the communication paths along which writes are propagated from one node to another
- Replication can server several purposes:
- High Availability
- Keeping the system running, even when one machine (or several machines, or an entire datacenter) goes down
- Disconnected Operation
- Allowing an application to continue working even when there is a network interruption
- Latency
- Placing data geographically close to users, so that users can interact with it faster
- Scalability
- Being able to handle a higher volume of reads than a single machine could handle, by performing reads on replicas
- Replication requires carefully thinking about concurrency and about all the things that can go wrong, and dealing with the consequences of those faults. At a minimum, we need to deal with unavailable nodes and network interruptions.
- Three main approaches to replication:
- Single-leader Replication
- Clients send all writes to a single node (the leader), which sends a stream of data change events to the other replicas (followers). Reads can be performed on any replica, but reads from followers might be stale.
- Popular because it is fairly easy to understand and there is no conflict resolution to worry about
- Multi-Leader Replication
- Clients send each write to one of several leader nodes, any of which can accept writes. The leaders send streams of data change events to each other and to follower nodes.
- More robust in terms of presence of faulty nodes, network interruptions, and latency spikes - at the cost of being harder to reason about and providing only very weak consistency guarantees
- Leaderless Replication
- Clients send each write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data.
- More robust in terms of presence of faulty nodes, network interruptions, and latency spikes - at the cost of being harder to reason about and providing only very weak consistency guarantees
- Replication can be synchronous or asynchronous, which has a profound effect on the system behavior when there is a fault. Although asynchronous replication can be fast when the system is running smoothly, it's important to figure out what happens when replication lag increases and servers fail. If a leader fails and you promote an asynchronously updated follower to be the new leader, recently committed data may be lost.
- Consistency Models which are helpful for deciding how an application should behave under replication lag:
- Read-after-write consistency
- Users should always see data that they submitted themselves
- Monotonic reads
- After users have seen the data at one point in time, they shouldn't later see the data from some earlier point in time
- Consistent prefix reads
- Users should see the data in a state that makes causal sense: for example, seeing a question and its reply in the correct order
Partitioning
- For very large datasets, having multiple copies of the same data on different nodes is not sufficient, you need to break the data up into partitions also known as sharding.
- Normally, partitions are defined in such a way that each piece of data (each record, row, or document) belongs to exactly one partition. In effect, each partition is a small database of its own, although the database may support operations that touch multiple partitions at the same time.
- The main reason for wanting to partition is scalability. Different partitions can be placed on different nodes in a shared-nothing cluster. Thus, a large dataset can be distributed across many disks, and the query load can be distributed across many processors.
- Partitioning is usually combined with replication so that copies of each partition are stored on multiple nodes.
- Partitioning by Key Range: One way of partitioning is to assign a continuous range of keys (from some minimum to some maximum) to each partition, like the volumes of a paper encyclopedia. Certain partitions can lead to hot spots.
- Partitioning by Hash of Key: Many distributed datastores use a hash function to determine the partition for a given key
- The goal of partitioning is to spread the data and query load across multiple machines, avoiding hot spots (nodes with proportionally high load). This requires choosing a partitioning scheme that is appropriate to your data, and rebalancing the partitions when nodes are added to or removed from the cluster.
- Main Approaches to partitioning:
- Key Range Partitioning
- Where keys are sorted, and a partition owns all the keys from some minimum up to some maximum. Sorting has the advantage that efficient range queries are possible, but there is a risk of hot spots if the application often accesses keys that are close together in the sorted order
- In this approach, partitions are typically rebalanced dynamically by splitting the range into two subranges when a partition gets too big
- Hash Partitioning
- Where a hash function is applied to each key, and a partition owns a range of hashes. This method destroys the ordering of keys, making range queries inefficient, but may distribute load more evenly
- When partitioning by hash, it is common to create a fixed number of partitions in advance, to assign several partitions to each node, and to move entire partitions from one node to another when nodes are added or removed. Dynamic partitioning can also be used
- Hybrid approaches are also possible.
- Interaction between partitioning and secondary index. A secondary index also needs to be partitioned, and there are two methods:
- Document partitioned indexes (local indexes), where the secondary indexes are stored in the same partition as the primary key and value. This means that only a single partition needs to be updated on write, but a read of the secondary index requires a scatter/gather across all partitions.
- Term-partitioned indexes (global indexes), where the secondary indexes are partitioned separately, using the indexed values. An entry in the secondary index may include records from all partitions of the primary key. When a document is written, several partitions of the secondary index need to be updated; however, a read can be served from a single partition.
- Techniques for routing queries to the appropriate partition:
- Partition-aware load balancing
- Sophisticated parallel query execution engines
By design, every partition operates mostly independently - that's what allows a partitioned database to scale to multiple machines. However, operations that need to write to several partitions can be difficult to reason about.
I'm just getting the summary from here on out. I am pretty tired of reading about all of this and there are managed services out there that can solve these types of problems for you.
Transactions
- Many things can go wrong in data systems;
- the database software or hardware may fail at any time
- application may crash at any time
- Interruptions in the network can unexpectedly cut off the application from the database, or one database node from another
- Several clients may write to the database at the same time, overwriting each other's changes
- A client may read data that doesn't make sense because it has only partially been updated
- Race conditions between clients can cause surprising bugs
- For decades, transactions have been the mechanism of choice for simplifying these issues. A transaction is a way for an application to group several reads and writes together into a logical unit. Conceptually, all the reads and writes in a transaction are executed as one operation: either the entire transaction succeeds (commit) or it fails (abort, rollback). With transactions, error handling becomes much simpler for an application, because it doesn't need to worry about partial failure - the case where some operations succeed and others fail.
- Transactions are an abstraction layer that allows an application to pretend that certain concurrency problems and certain kinds of hardware and software faults don't exist. A large class of errors is reduced down to a simple transaction abort, and the application just needs to try again.
- Without transactions, data can become inconsistent in various ways.
- Isolation levels of concurrency control: read committed, snapshot isolation, and serializable.
- Examples of race conditions:
- Dirty Reads
- One client reads another client's writes before they have been committed. The read committed isolation level and stronger levels prevent dirty reads.
- Dirty Writes
- One client overwrites data that another client has written, but not yet committed. Almost all transaction implementations prevent dirty writes.
- Read Skew (nonrepeatable reads)
- A client sees different parts of the database at different points in time. This issue is most commonly prevented with snapshot isolation, which allows a transaction to read from a consistent snapshot at one point in time. It is usually implemented with multi-version concurrency control (MVCC).
- Lost Updates
- Two clients concurrently perform a read-modify-write cycle. One overwrites the other's write without incorporation it changes, so data is lost. Some implementations of snapshot isolation prevent this anomaly automatically, while others require a manual lock (
SELECT FOR UPDATE
).
- Two clients concurrently perform a read-modify-write cycle. One overwrites the other's write without incorporation it changes, so data is lost. Some implementations of snapshot isolation prevent this anomaly automatically, while others require a manual lock (
- Write Skew
- A transaction reads something, makes a decision based on the value it swa, and writes the decision to the database. However, by the time the write is made, the premise of the decisions is no longer true. Only serializable isolation prevents this anomaly.
- Phantom Reads
- A transaction reads objects that match some search condition. Another client makes a write that affects the results of that search. Snapshot isolation prevents straightforward phantom reads, but phantoms in the context of write skew require special treatment, such as index-range locks.
- Weak isolation levels protect against some of the above anomalies but leave others for you to handle manually. Only serializable isolation protects against all of these issues. Three approaches to implementing serializable transactions:
- Literally executing transactions in a serial order
- If you can make each transaction very fast to execute, and the transaction throughput is low enough to process on a single CPU core, this is a simple and effective option
- Two-phase locking
- For decades, this has been the standard way of implementing serializability, but many applications avoid using it because of performance characteristics.
- Serializable Snapshot Isolation (SSI)
- A fairly new algorithm that avoids most of the downsides of the previous approaches. It uses an optimistic approach, allowing transactions to proceed without blocking. when a transaction wants to commit, it is checked, and it is aborted if the execution was not serializable.
The Trouble with Distributed Systems
- Assume that everything goes wrong.
- Kinds of problems that can occur in distributed systems:
- Whenever you send a packet over the network, it may be lost or get arbitrarily delayed. Likewise, the reply may be lost or delayed, so if you don't get a reply, you have no idea whether the message got through.
- A node's clock may be significantly out of sync with other nodes (despite your best efforts to set up NTP), it may suddenly jump forward or back in time and relying on it is dangerous because you most likely don't have a good measure of your clock's error interval.
- A process may pause for a substantial amount of time in its execution (perhaps due to a stop-the-world garbage collector), be declared dead by other nodes, and then come back to life again without realizing that it was paused.
The fact that such partial failures can occur is the defining characteristic of distributed systems. Whenever software tries to do anything involving other nodes, there is the possibility that it may occasionally fail, or randomly go slow, or not respond at all. In distributed systems, we try to build tolerance of partial failures into software, so that the system as a whole may continue functioning event when some of its constituent parts are broken.
- To tolerate faults, the first step is to detect them. Most distributed algorithms rely on timeouts to determine whether a remote node is more available.
- Fault tolerance and low latency (by placing data geographically close to users) are equally important goals, and those things cannot be achieved with a single node.
Consistency and Consensus
- Tolerating faults - keeping the service functioning correctly, even if some internal component is faulty
- The best way of building fault-tolerant systems is to find some general-purpose abstractions with useful guarantees, implement them once, and then let applications rely on those guarantees.
- One of the most important abstractions for distributed systems is consensus - getting all of the nodes to agree on something.
- linearizability - a popular consistency model; its goal is to make replicated data appear as though there were only a single copy and to make all operations act on it atomically. Linearizability makes the DB behave like a variable in a single threaded program, but it has the downside of being slow, especially in environments with large network delays.
- Causality - imposes an ordering system on events in a system. Causality provides us with a weaker consistency model: some things can be concurrent, so the version history is like a timeline with branching and merging. Causality is less sensitive to network problems.
- Achieving consensus means deciding something in such a way that all nodes agree on what was decided, and such that the decision is irrevocable. A wide range of problems are reducible to consensus and are equivalent to each other. Some equivalent problems include:
- Linearizable compare-and-set registers
- Atomic transaction commit
- Total order broadcast
- Locks and leases
- Membership/coordination service
- Uniqueness constraint
- Although a single-leader database can provide linearizability without executing a consensus algorithm on every write, it still requires consensus to maintain its leadership and for leadership changes.
Derived Data
- Applications commonly use a combination of several different datastores, indexes, caches, analytics systems, etc. and implement mechanisms for moving data from one store to another.
- On a high level, systems that store and process data can be grouped into two broad categories:
- Systems of record
- A system of record, also known as source of truth, holds the authoritative version of your data. When new data comes in, e.g., as user input, it is first written here. Each fact is represented exactly once (the representation is typically normalized). If there is any discrepancy between another system and the system of record, then the value in the system of record is (by definition) the correct one.
- Derived data systems
- Data in a derived system is the result of taking some existing data from another system and transforming or processing it in some way. If you lose derived data, you can recreate it from the original source. A classic example is a cache: data can be served from the cache if present, but if the cache doesn't contain what you need, you can fall back to the underlying database. Denormalized values, indexes, and materialized views also fall into this category. In recommendation systems, predicative summary data is often derived from usage logs.
- Technically speaking, derived data is redundant, in the sense that it duplicates existing information. However, it is often essential for getting good performance on read queries. It is commonly denormalized. You can derive several different datasets from a single source, enabling you to look at the data from different
points of view.
- Most databases, storage engines, and query languages are not inherently either a system of record or a derived system - it depends on how you use it.
Batch Processing
- Distinguishing between different types of systems:
- Services (online systems)
- A service waits for a request or instruction from a client to arrive. When one is received, the service tries to handle it as quickly as possible and sends a response back. Response time is usually the primary measure of performance of a service, and availability is often very important (if the client can't reach the service, the users will probably get an error message).
- Batch Processing Systems (offline systems)
- A batch processing system takes a large amount of input data, runs a job to process it, and produces some output data. Jobs often take a while (from a few minutes to several days), so there normally isn't a user waiting for the job to finish. Instead, batch jobs are often scheduled to run periodically (for example, once a day). The primary performance measure of a batch job us usually throughput ( the time it takes to crunch through an input dataset of a certain size). We discuss batch processing in this chapter.
- Stream Processing Systems (near-real-time systems)
- Stream processing is somewhere between online and offline/batch processing (so it is sometimes called near-real-time or nearline processing). Like a batch processing system, a stream processor consumes inputs and produces outputs (rather than responding to requests). However, a stream job operates on events shortly after they happens, whereas a batch job operates on a fixed set of input data. This difference allows stream processing systems to have lower latency than the equivalent batch systems.
- The two main problems that distributed batch processing frameworks need to solve are:
- Partitioning
- In MapReduce, mappers are partitioned according to input file blocks. The output of mappers is repartitioned, sorted, and merged into a configurable number of reducer partitions. The purpose of this process is to bring all the related data - e.g., all the records with the same key - together in the same place.
- Post-MapReduce dataflow engines to avoid sorting unless it is required, but they otherwise take a broadly similar approach to partitioning.
- Fault tolerance
- MapReduce frequently writes to disk, which makes it easy to recover from an individual failed task without restarting the entire job but slows down execution in the failure-free case. Dataflow engines perform less materialization of intermediate state and keep more in memory, which means that they need to recompute more data if a node fails. Deterministic operators reduce the amount of data that needs to be recomputed.
- Several join algorithms for MapReduce:
- Sort-merge joins
- Each of the inputs being joined goes through a mapper that extracts the join key. By partitioning, sorting, and merging, all records with the same key end up going to the same call of the reducer. This function can then reduce the joined records.
- Broadcast hash joins
- One of the two join inputs is small, so it is not partitioned and it can be entirely loaded into a hash table. Thus, you can start a mapper for each partition of the large join input, load the hash table for the small input into each mapper, and then scan over the large input one record at a time, querying the hash table for each record.
- Partitioned hash joins
- If the two join inputs are partitioned in the same way (using the same key, same hash function, and same number of partitions), then the hash table approach can be used independently for each partition.
- Distributed batch processing engines have a deliberately restricted programming model: callback functions (such as mappers and reducers) are assumed to be stateless and to have no externally visible side effects besides their designated output.
- The distinguishing feature of a batch processing job is that it reads some input data and produces some output data, without modifying the input - in other words, the output is derived from the input.
Stream Processing
- The problem with occasional batch processing is that changes in the input are only reflected in the output some time later, which is too slow for many impatient users. To reduce the delay, we can run the processing more frequently - say, processing a second's worth of data at the end of every second - or even continuously, abandoning the fixed time slices entirely and simply processing every event as it happens. This is the idea behind stream processing.
- A
stream
refers to data that is incrementally made available over time. The concept appears in many places. - Stream processing is similar to batch processing, but done continuously on unbounded (never-ending) streams rather than on a fixed-size input.
- Two types of message brokers:
- AMQP/JMS-style message broker
- The broker assigns individual message to consumers, and consumers acknowledge individual messages when they have been successfully processed. Messages are deleted from the broker once they have been acknowledged. This approach is appropriate as an asynchronous form of RPC.
- Log-based message broker
- The broker assigns all messages in a partition to the same consumer node, and always delivers messages in the same order. Parallelism is achieved through partitioning, and consumers track their progress by checkpointing the offset of the last message they have processed. The broker retains messages on disk, so it is possible to jump back and reread old messages if necessary.
- Purposes of stream processing: searching for event patterns (complex event processing), computing windowed aggregations (stream analytics), and keeping derived data systems up to date (materialized views).
- Three types of joins that may appear in stream processes:
- Stream-stream joins
- Both input streams consist of activity events, and the join operator searches for related events that occur within some window of time.
- Stream-table joins
- One input stream consists of activity events, while the other is a database change log. The changelog keeps a local copy of the database up to date. For each activity event, the join operator queries the database and outputs an enriched activity event.
- Table-table joins
- Both input streams are database changelogs. In this case, every change on one side is joined with the latest state of the other side. The result is a stream of changes to the materialized view of the join between the two tables.
Comments
There are currently no comments to show for this article.