Tkrzw-RPC: RPC interface of Tkrzw
Overview
Tkrzw-RPC is a package of a server program which manages databases of Tkrzw and a library to access the service via gRPC protocol. Tkrzw is a library to manage key-value storages in various algorithms. With Tkrzw, applications can handle database files efficiently in the process without any network overhead. However, multiple processes cannot open the same database file simultaneously. Tkrzw-RPC solves this issue by using a server program which manages database files and allowing other processes to access the contents via RPC.
One server process can handle multiple databases, each of which can be different data structure and tuning parameters. Each database is an instance of the PolyDBM class, which is an adapter class to handle the following classes with the same interface.
- HashDBM : File database manager implementation based on hash table.
- TreeDBM : File database manager implementation based on B+ tree.
- SkipDBM : File database manager implementation based on skip list.
- TinyDBM : On-memory database manager implementation based on hash table.
- BabyDBM : On-memory database manager implementation based on B+ tree.
- CacheDBM : On-memory database manager implementation with LRU deletion.
The library of Tkrzw-RPC provides C++ components to access the database service via gRPC protocol. Thus, you can easily write application programs in C++. You use the class RemoteDBM whose API is very similar to the PolyDBM class so that you can use it as if you operates local databases. Moreover, gRPC automates to generate client interfaces in various languages, based on the service definition in the protocol buffers tkrzw_rpc.proto. Tkrzw-RPC also provides command line utilities to access the database service. Separate packages of client libraries for Java, Python, Ruby, and Go are also provided.
The server supports asynchronous replication for high availability of the service. Every update on the databases of a server called "master" can be monitored by another server called "slave" and updates are applied to the databases on the slave immediately. Thus, databases on both servers are synchronized. If the master dies, clients connects to the slave to continue the service without any downtime. The slave is treated as the new master by adding another slave. Slaves can be used as read-only replicas for load distribution. The "dual masters" topology is also available, where two servers replicate each other.
Download
You can download source packages in the following directories.
- C++ source packages
- Java client library packages
- Python client library packages
- Ruby client library packages
- Go client library packages
API Documents
The following are API documents for each language.
Installation
Tkrzw-RPC is implemented based on the C++17 standard and POSIX API. It works on Unix-like systems (Linux, FreeBSD, and Mac OS X), and GCC (version 7.3 or later) is required to build programs.
Download the latest version of the source packages. The core library of gRPC is also required so install it beforehand.
To build the server and the library, usually, you will run the following commands. If you have installed Tkrzw and gRPC into another location than "/usr/local", specify the prefix by the "--with-extra" of "./configure", such as "--with-extra=/opt/homebrew".
On some UNIX-like systems including FreeBSD and Solaris, "make" defaults to bsdmake or other non-GNU make, so you might need to replace "make" with "gmake" in the above instructions.
By default, the library and related files are installed under "/usr/local". If you want to install the files under "/usr", specify "--prefix=/usr" with the configure scriptt.
To perform the integration tests, run these command on two respective terminals.
To run the test suite, GoogleTest is required. Although testing the library is not necessary for most users, you can do so just in case.
You can run end-to-end tests with the following procedures. First, run the server.
While the server is running, run the client commands on another terminal.
If the message "Checking completed" is shown, all checks are OK. You can stop the server process by inputting Ctrl-C on the same terminal.
Kickstart
Let's run the server with a simple setting. By default, an on-memory database of TinyDBM is specified. By specifying the log level to "debug", every RPC call to the service is printed in the log stream.
Let's play with the command line utilities on another terminal.
Stop the server by inputting Ctrl-C on the terminal of the server command. As a result, such logs as the following are printed.
As the on-memory database was not associated with any file, records in the database vanished when the server shut down. Next, let's run the server with two on-memory databases associated with files. The first database is TinyDBM, which is on-memory hash database. The second database is BabyDBM, which is on-memory tree database.
Let's add serveral records to each databases. The target of the operation is specified by the "--index" option, whose default value is zero referring to the first database.
Stop the server by inputting Ctrl-C on the terminal of the server command. This time, all records are stored in the specified files. Then, restart the server with the same arguemnts.
Confirm that the records are loaded properly. As the second database is an ordered database, the record keys are in acsending order for sure.
Read the C++ API documents for usage of the C++ client library. Then, let's build a sample program. Make a file of the following C++ code and save it as "helloworld.cc".
To build an application program, you'll typically run a command like this. The compiler flag "-std=c++17" is necessary if the default C++ version of your compiler is older than C++17.
The bundled command "tkrzw_rpc_build_util" is useful to know necessary CPPFLAGS (flags for the preprocessor), and LDFLAGS (flags for the linker).
In some environments, you can also use the "pkg-config" command to list up those flags.
Build the above sample program and run it while the server is running.
Commands Line Interfaces
tkrzw_rpc_build_util: Build Utilities
tkrzw_rpc_build_util is a command for build utilities. Thereby, you can obtain flags to build your own program using the library of Tkrzw-RPC. The usage is the following. A subcommand name comes as the first argument. Some options and other arguments can follow.
tkrzw_rpc_build_util config [options]
- Prints configurations.
tkrzw_rpc_build_util version
- Prints the version information.
- Options of the config subcommand:
-v
: Prints the version number.-i
: Prints C++ preprocessor options for build.-l
: Prints linker options for build.-p
: Prints the prefix for installation.
The following is a way to build your own application without setting complicated flags.
tkrzw_server: Database Server
tkrzw_server is a command to run the database service of Tkrzw-RPC. One server process can handle multiple databases at the same time. The process can be a daemon process to run in the background. The server can use either of IPv4, IPv6, or UNIX domain socket as the network interface.
tkrzw_server [options] [db_configs]
- Runs the database service.
- Options:
--version
: Prints the version number and exit.--address str
: The address/hostname and the port of the server (default: 0.0.0.0:1978)--auth configs
: Enables authentication with the configuration.--async
: Uses the asynchronous API on ths server.--threads num
: The maximum number of worker threads. (default: 1)--log_file str
: The file path of the log file. (default: /dev/stdout)--log_level str
: The minimum log level to be stored: debug, info, warn, error, fatal. (default: info)--log_date str
: The log date format: simple, simple_micro, w3cdtf, w3cdtf_micro, rfc1123, epoch, epoch_micro. (default: simple)--log_td num
: The log time difference in seconds. (default: 99999=local)--server_id num
: The server ID. (default: 1)--ulog_prefix str
: The prefix of the update log files.--ulog_max_file_size num
: The maximum file size of each update log file. (default: 1Gi)--repl_ts_file str
: The replication timestamp file.--repl_ts_from_dbm
: Uses the database timestamp if the timestamp file doesn't exist.--repl_ts_skew num
: Skews the timestamp by a value.--repl_wait num
: The time in seconds to wait for the next log. (default: 1)--pid_file str
: The file path of the store the process ID.--daemon
: Runs the process as a daemon process.--shutdown_wait num
: Time in seconds to wait for the service shutdown gracefully. (default: 60)--read_only
: Opens the databases in the read-only mode.
If you don't set database configurations, an on-memory database of TinyDBM with the default tuning is served. You can set one or more database configurations too. Each confituration is in the format of "path#name1=value1,name2=value2,..." which is composed of the file path of the database, "#", and CSV of tuning parameters. The extension of the database path determines the database class. ".tkh" for HashDBM, ".tkt" for TreeDBM, ".tks" for SkipDBM, ".tkmt" for TinyDBM, ".tkmb" for BabyDBM, and ".tkmc" for CacheDBM. The database path can be empty for on-memory databases. The "dbm" parameter overwrites the decision by the extension. See PolyDBM for details. The following are samples.
- #dbm=BabyDBM
- Opens a BabyDBM without associating any file.
- casket#dbm=HashDBM
- Opens the file "casket" as a HashDBM.
- casket.tkh#num_buckets=10M,update_mode=appending,restore_mode=sync
- Opens the file "casket.tkh" as a HashDBM. The bucket number is set to 10 million. Uses the appending update mode and the synchronous restore mode.
- casket.tkh#num_buckets=100M,align_pow=8,cache_buckets=1,file=pos-para,block_size=512,access_options=direct:padding:pagecache
- Opens the file "casket.tkh" as a HashDBM. The bucket number is set to 100 million. The other parameters are tuned for direct I/O.
- casket.tkt#num_buckets=1M,max_cached_pages=100K,record_comp_mode=lz4
- Opens the file "casket.tkt" as a TreeDBM. The bucket number is set to 1 million. The maximum number of cached pages is 100 thousand. Pages are compressed with LZ4.
- casket.tks#sort_mem_size=2Gi,max_cached_records=100K
- Opens the file "casket.tkt" as a TreeDBM. The memory size for sorting is 2GiB. The maximum number of cached records is 100 thousand.
- casket.tkmt#num_buckets=10M
- Opens the file "casket.tkmt" as a TinyDBM. The bucket number is set to 10 million.
- casket.tkmb
- Opens the file "casket.tkmb" as a BabyDBM.
- casket.tkmc#cap_rec_num=10M,cap_mem_size=2Gi
- Opens the file "casket.tkmc" as a CacheDBM. The maximum number of records is 10 million. The total memory size to use is 2GiB.
Some tuning parameters of file databases (HashDBM, TreeDBM, SkipDBM) are embedded in the file. Thus, you can create a new database with tkrzw_dbm_util command with proper tuning parameters and specify it to tkrzw_server without tuning parameters. All tuning parameters of on-memory databases have to be set every time. Unit suffixes can be added to numeric values. "k" or "K" means 1,000 times, "ki" or "Ki" means 1024 times. "m" or "M" means 1,000,000 times. "mi" or "Mi" means 1,048,576 times. "g" or "G" means 1,000,000,000 times. "gi" or "Gi" means 1,073,741,824 times.
By default, the server address is "0.0.0.0:1978", which means that the socket is bound to all network interfaces of IPv4 and IPv6 on the machine and that the port number is 1978. To use a UNIX domain socket, specify the socket file path like "unix:/run/tkrzw_server.socket".
By default, the server uses the synchronous API of gRPC. If the number of clients is limited (say, 20 or less) and they don't call RPC continuously, the maximum throughput of the server doesn't matter but the least latency does. In such a case, using the synchronous API leads to the best performance. Otherwise, you will pursue the maximum throughput of the server. Then, you should specify the "--async" option to use the asynchronous API. It enables the server to handle 10 thousands of connections at the same time and show more throughput than 100 thousand QPS. The "--threads" option specifies the maximum number of worker threads used by the synchronous API, or it specifies the fixed number of queue-thread pairs used in the asynchronous API. Usually, the number of threads should be the same as the number of cores of the CPU. If you run clients on the same machine and they use much CPU time, the number of threads of the server should be less.
To finish the server process running on foreground, input Ctrl-C on the terminal. If you run the server as a system service, run the process as a daemon with the "--daemon" option. To finish the daemon process, send a termination signal such as SIGTERM by the "kill" command. If a daemon process catches SIGHUP, the log file is re-opened. To send signals to the process, you have to know the process ID. So, it's a good practice to write the process ID to a file by the "--pid" flag. Because thr current directory of a daemon process is changed to the root directory, paths of related files should be described as their absolute paths.
The following command starts the database service as a daemon process. Usually, it is run by the start up script of the system.
The following command stops the database service gracefully. Usually, it is run by the the shutdown script of the system.
For log rotation, you run the following script. Usually, it is run by a cron script invoked periodically.
tkrzw_dbm_remote_util: Database Client
tkrzw_dbm_remote_util is a client command to invoke RPCs. Thereby, you can set records, retrieve records, remove records, and rebuild databases. A subcommand name comes as the first argument. Some options and other arguments can follow.
tkrzw_dbm_remote_util echo [options] [message]
- Invokes an echoing back test.
tkrzw_dbm_remote_util inspect [options] [attr]
- Prints inspection of a database file.
tkrzw_dbm_remote_util get [options] key
- Gets a record and prints it.
tkrzw_dbm_remote_util set [options] key value
- Sets a record.
tkrzw_dbm_remote_util remove [options] key
- Removes a record.
tkrzw_dbm_remote_util list [options]
- Lists up records and prints them.
tkrzw_dbm_remote_util queue [options] [value]
- Enqueue or dequeue a record.
tkrzw_dbm_remote_util clear [options]
- Removes all records.
tkrzw_dbm_remote_util rebuild [options] [params]
- Rebuilds a database file for optimization.
tkrzw_dbm_remote_util sync [options] [params]
- Synchronizes a database file.
tkrzw_dbm_remote_util search [options] pattern
- Synchronizes a database file.
tkrzw_dbm_remote_util changemaster [options] [master]
- Changes the master of replication.
tkrzw_dbm_remote_util replicate [options] [db_configs...]
- Replicates updates to local databases.
- Common options:
--version
: Prints the version number and exits.--address
: The address and the port of the service (default: localhost:1978)--timeout
: The timeout in seconds for connection and each operation.--auth configs
: Enables authentication with the configuration.--index
: The index of the DBM to access. (default: 0)--multi
: Calls xxxMulti methods for get, set, and remove subcommands.- Options for the set subcommand:
--no_overwrite
: Fails if there's an existing record with the same key.--append str
: Appends the value at the end after the given delimiter.--incr num
: Increments the value with the given initial value.- Options for the list subcommand:
--move type
: Type of movement: first, jump, jumplower, jumplowerinc, jumpupper, jumpupperinc. (default: first)--jump_key str
: Specifies the jump key. (default: empty string)--items num
: The number of items to print. (default: 10)--escape
: C-style escape is applied to the TSV data.--keys
: Prints keys only.- Options for the queue subcommand:
--notify
: Sends notifications when queueing.--retry num
: The maximum wait time in seconds before retrying.--escape
: C-style escape is applied to the output data.--key
: Prints the key too.--compex str
: Calls CompareExchange with the key.- Options for the sync subcommand:
--hard
: Does physical synchronization with the hardware.- Options for the search subcommand:
--mode str
: The search mode: contain, begin, end, regex, edit, editbin, upper, upperinc, lower, lowerinc (default: contain)--items num
: The number of items to retrieve. (default: 10)--escape
: C-style escape is applied to the TSV data.--keys
: Prints keys only.- Options for the changemaster subcommand:
- Options for the replication subcommand:
--ts_file str
: The replication timestamp file.--ts_from_dbm
: Uses the database timestamp if the timestamp file doesn't exist.--ts_skew num
: Skews the timestamp by a value.--server_id num
: The server ID of the client.--wait num
: The time in seconds to wait for the next log.--items num
: The number of items to print. (default: 10)--escape
: C-style escape is applied to the TSV data.
The following is a sample usage to make a remove database to associate country names to their capital cities' names. If the extension of the file is "tkh", the type of the database is regarded as HashDBM.
You can check the current status of the server with the "inspect" subcommand. If the "--index" is zero or positive, the metadata of the corresponding database is shown. If it is negative, the metadata of the server is shown.
"memory_usage" represents the memory usage by the process, in bytes. It is actually the RSS (resident set size) which includes the memory size used by the process and the memory size used by the shared libraries. "memory_capacity" represents the capacity of RAM available on the system, in bytes. "num_active_calls" represents the number of active RPC calls. "running_time" represents the elapsed time since the server starts.
tkrzw_dbm_remote_perf: Performance Checker
tkrzw_dbm_remote_perf is a command for checking performance of RPC invodation. Thereby, you can check performance of remote database operations in various scenarios including multithreading. A subcommand name comes as the first argument. Some options and other arguments can follow.
tkrzw_dbm_remote_perf sequence [options]
- Checks echoing/setting/getting/removing performance in sequence.
tkrzw_dbm_remote_perf parallel [options]
- Checks setting/getting/removing performance in parallel.
tkrzw_dbm_remote_perf wicked [options]
- Checks consistency with various operations.
tkrzw_dbm_remote_perf queue [options]
- Checks queueing and dequeueing operations.
- Common options:
--address
: The address and the port of the service (default: localhost:1978)--timeout
: The timeout in seconds for connection and each operation.--auth configs
: Enables authentication with the configuration.--index
: The index of the DBM to access. (default: 0)--iter num
: The number of iterations. (default: 10000)--size num
: The size of each record value. (default: 8)--threads num
: The number of threads. (default: 1)--separate
: Use separate instances for each thread.--random_seed num
: The random seed or negative for real RNG. (default: 0)- Options for the sequence subcommand:
--random_key
: Uses random keys rather than sequential ones.--random_value
: Uses random length values rather than fixed ones.--echo_only
: Does only echoing.--set_only
: Does only setting.--get_only
: Does only getting.--iter_only
: Does only iterating.--remove_only
: Does only removing.--stream
: Uses the stream API.--ignore_result
: Ignores the result status of streaming updates.--multi num
: Sets the size of a batch operation with xxxMulti methods.- Options for the parallel subcommand:
--random_key
: Uses random keys rather than sequential ones.--random_value
: Uses random length values rather than fixed ones.--stream
: Uses the stream API.- Options for the wicked subcommand:
--iterator
: Uses iterators occasionally.--clear
: Clears the database occasionally.--rebuild
: Rebuilds the database occasionally.- Options for the queue subcommand:
--notify
: Sends notifications when queueing.--retry num
: The maximum wait time in seconds before retrying.
The following is a sample usage to echo back 1 million messages, to store 1 million records of 8-byte keys and 8-byte values, to retrieve all of them, and to remove all of them in sequence. The number of threads is 10 and each thread does 100 thousand operations.
RemoteDBM: Remote Database API
The remote database is an interface to access the database service of Tkrzw-RPC. It encapsulates the existence of the network layer so that you can use the features as if you operate local databases. RemoteDBM is thread-safe so multiple threads can share the same instance, which saves the number of connections. As the server supports both the synchronous API and the asynchronous API, RemoteDBM supports only the synchronous API on the client side. Combination of the asynchronous API on the server side and the synchronous API on the client side is possible. And, usually, it is the best setting because it maximizes the throughput of the server and simplifies the client code structure.
Before any operation, you have to call the Connect method to make a connection to the database server. You set the server address and the port number, like "192.168.0.8:1978" and "example.com:8080". To use a UNIX domain socket, set the path like "unix:/run/tkrzw_server.socket". You can disconnect the connection by calling the Disconnect method.
The database service can handle multiple databases at the same time. By default, the target database of operation by a RemoteDBM instance is the first database of the service. If you access the second database, call the SetDBMIndex method with the parameter 1. If you access the third database, set the parameter 2. If multiple threads uses different indices, they should use separate instances of RemoteDBM.
To retrieve, store, and remove a record, you call the Get, Set, and Remove methods respectively. If you handle multiple records at once, calling the GetMulti, SetMulti, and RemoveMulti methods is better in terms of performance. CompareExchange, CompareExchangeMulti, and Increment are useful methods to do atomic operations.
The MakeStream method makes an instance of the Stream class. A stream is bound to one thread on the server. If you can call Get, Set, and Remove intensively, calling them via the stream gives you better performance. The MakeIterator method makes an instance of the Iterator class. An iterator is also bound to one thread on the server and it allows you stateful operations like First, Jump, Next, and Get. Stream objects and iterator objects should be destructed as soon as possible, in order to release the server threads.
Most methods return a Status object to represent the result of the operation. The meaning of the status code is the same as the local API except for the code NETWORK_ERROR which represents errors from gRPC.
Example Code
This is a code example where basic operations are done without checking errors.
iter = dbm.MakeIterator();
iter->First();
std::string key, value;
while (iter->Get(&key, &value) == Status::SUCCESS) {
std::cout << key << ":" << value << std::endl;
iter->Next();
}
// Disconnects the connection.
dbm.Disconnect();
return 0;
}
]]>
This is a code example which represents a more serious use case with thorough error checks.
stream = dbm.MakeStream();
// Stores records.
// Bit-or assignment to the status updates the status if the original
// state is SUCCESS and the new state is an error.
status |= stream->Set("foo", "hop");
status |= stream->Set("bar", "step");
status |= stream->Set("baz", "jump");
if (status != Status::SUCCESS) {
// The Set operation shouldn't fail. So we stop if it happens.
Die("Set failed: ", status);
}
// Store records, ignoring the result status.
stream->Set("quux", "land", true, true);
stream->Set("xyzzy", "rest", true, true);
// Retrieves records.
// If there was no record, NOT_FOUND_ERROR would be returned.
std::string value;
status = stream->Get("foo", &value);
if (status == Status::SUCCESS) {
std::cout << value << std::endl;
} else {
std::cerr << "missing: " << status << std::endl;
}
// Destroys the stream if it is not used any longer.
stream.reset(nullptr);
// Makes an iterator for traversal operations.
std::unique_ptr iter = dbm.MakeIterator();
// Traverses records.
if (iter->First() != Status::SUCCESS) {
// Failure of the First operation is critical so we stop.
Die("First failed: ", status);
}
while (true) {
// Retrieves the current record data.
std::string iter_key, iter_value;
status = iter->Get(&iter_key, &iter_value);
if (status == Status::SUCCESS) {
std::cout << iter_key << ":" << iter_value << std::endl;
} else {
// This happens at the end of iteration.
if (status != Status::NOT_FOUND_ERROR) {
// Error types other than NOT_FOUND_ERROR are critical.
Die("Iterator::Get failed: ", status);
}
break;
}
// Moves the iterator to the next record.
status = iter->Next();
if (status != Status::SUCCESS) {
// This could happen if another thread removed the current record.
if (status != Status::NOT_FOUND_ERROR) {
// Error types other than NOT_FOUND_ERROR are critical.
Die("Iterator::Get failed: ", status);
}
std::cerr << "missing: " << status << std::endl;
break;
}
}
// Destroys the iterator if it is not used any longer.
iter.reset(nullptr);
// Disconnects the connection.
// Even if you forgot to disconnect it, the destructor would do it.
// However, checking the status is a good manner.
status = dbm.Disconnect();
if (status != Status::SUCCESS) {
// The Disconnect operation shouldn't fail. So we stop if it happens.
Die("Disconnect failed: ", status);
}
return 0;
}
]]>
Tips
Typical Usage for Online Service
HashDBM is designed to combine high performance and durability in the usage of online service. Let's say, you run an online service and manage user data on a hash database. The database is served by tkrzw_server and thousands of clients connect to the server and cause queries at 100K QPS at a peak time. You run the server like this.
With the "--async" option, you use the asynchronous API of gRPC, which maximizes the throughput. With the "--threads 8" option, the number of "completion queues" and the number of threads to consume tasks in each queue are set 8. The value should be the same as the number of the CPU cores.
With "num_buckets=10M", the intial number of hash buckets is 10 million, which assumes the number of users is currently 10 million or less. With "update_mode=appending", the update mode is in the appending mode, which realizes a high durability and a reasonable update performance on the large scale database.
As the update mode is the appending mode, the file size of the database grows by every update operations so you should rebuild the database periodically. To do so, you set the follwing cron script which is invoked every midnight.
&1 /dev/null ]]>
One of the most important features of HashDBM is that the Rebuild method is done without blocking other threads. So, you can rebuild the database while serving it online. The number of buckets is implicitly optimized for the current number of records.
Measures for Durability
If the server process or the entire operating system crashes unexpectedly, the database file is marked "unhealthy". If you start the server with an unhealthy database file, the file is restored automatically. By default, the library makes the best effort to restore as many updates as possible. How many updates are actually restored depends on the hehavior of the underlying file system, operating system and storage device.
If you want to guarantee that the past updates at a point are written all through the storage device, you should synchronize the database periodically or at an arbitrary critical point of operations. If you invoke it every 10 minutes, you set the following cron script.
&1 /dev/null ]]>
By setting the "--hard" option, synchronization is done to the storage device. It means that the current state is restored even on system crashes by the kernel panic, power failures, or etc. If you omit the option, synchronization is done to the file system, which is faster. It guarantees the data restoration only against process crashes by segmentation faults, the OOM killer, etc.
If you want ACID traits of transactions, add "restore_mode=sync" to the database parameters so that the state at the time of the last synchronization is restored. Client programs should call the Synchronize method after each transaction or at an arbitrary critical point of the business logic. As performance and durability is in a relation of trade-off, how you call syncoronization should be determined depending on the usage and content. See the durability settings of Tkrzw for details.
To make a backup file of a database while the server is running, you can use the "sync" subcommand and set the "make_backup" parameter whose value is empty. A date suffix in GMT like ".backup.20210901233046" is added to the database file name to generage the backup file name. If the value of "make_backup" is not empty, the value is used as the suffix. For security, the available characters are "[-_.0-9a-zA-Z]". The following cron setting makes a backup at 2:15 every night.
&1 /dev/null ]]>
Sharding the Database
Sharding is a technique to divide a database file into multiple files. There are two major benefits. One is that concurrency of multi-thread processing improves. The other is that database-wide operations are divided so that blocking duration becomes shorter. Moreover, the size of each temporary files becomes smaller.
Let's say, you shard a 1GB database into ten 102MB databases. Synchronizing 1GB database would take 1 second during which all other operations are blocked. In contrast, synchronizing one shard of 102MB file takes 0.1 seconds during which operations only on the shard are blocked. Although the total time of synchronizing operation is the same, total blocking time of other threads is much less. The same thing can be said to making backup files. Moreover, the space efficiency of rebuilding the database also improves although rebuiling is not blocking. Rebuilding a 1GB database requires a 1GB temporary file. Meanwhile, rebuilding ten 102MB database requires one 102MB temporary file at the same time because the temporary file is removed after the operation for each shard.
The downside of sharding is that you compromise on ACID traits of transaction. The CompareExchangeMulti operation is still atomic across threads and clients, it is not necessarily atomic across crashes. That's because the system can crash in the middle of synchronizing each shard. If you don't use CompareExchnageMulti or you can accpet the possibility of inconsistency on crashes, sharding is worth trying.
Let's set up a server of a file hash database in ten shards. You just add the parameter "num_shards=10". Note that the values of parameters for each shards should be divided by 10. For example, 10 million hash buckets in total should be set with "num_buckets=1M".
As we have 10 shards, "casket.tkh-00000-of-00010", "casket.tkh-00001-of-00010", ..., "casket.tkh-00009-of-00010" are created. If you make a backup, files like "casket.tkh.backup.20210908195557-00000-of-00010" et cetera are made.
External behaviors via RPC is the same whether the database is sharded or not. Despite that sharding is done with a hash function, the iterator maintains the original order of the database if it is ordered one (TreeDBM, SkipDBM, BabyDBM).
You can shared on-memory databases too. If an on-memory database is associated with a file, the blocking time during the Synchronize method might be an issue on online services. Sharding the database mitigates the issue. For example, an on-memory tree database containing 100 million records takes 24 seconds for the Synchronize method. You can divide it into 100 shards. Then, one shard takes only 0.24 seconds, which is not a problem in many cases.
The number of shards can be as large as you like. The more the shards are, the better the concurrency performance is. The downside of too many shard is that the iterator and the SearchModal method become slower.
Message Queue Service
Tkrzw-RPC can be used for a simple message queue service. TreeDBM and BabyDBM organize records in the ascending order of the key. The PushLast method generates a unique key from the current timestamp and store the given value. The PopFirst method gets the first record and remove it atomically. Combining the two methods, FIFO (First-In-First-Out) operations are realized. Moreover, the two method support features for real-time processing. The PopFirst method supports the retrying feature whereby the thread waits for the next record while there's no record in the database. The PushLast method supports the notification feature which wakes up the waiting PopFirst methods.
Let's see the actual behaviors of the FIFO operations. First, run the server with a tree database.
Call the PushLast method with values "one", "two", and "three". The keys are generated imlicitly.
Call the PopFirst methodd until the database becomes empty. The "--key" option shows the key which were generated automatically from the time stamp.
Call the PopFirst methodd with the empty database and have it wait for 5 seconds.
Then, call the PopFirst methodd with the empty database and have it wait for 30 seconds. This time, while the terminal for PopFirst is blocking, call PushLast on another terminal to add a new record. Specify the "--notify" option to send notification to the waiting threads.
You'll see "four" appear on the first terminal immediately after you run the command on the second terminal. This style is called "long polling", which is seen with the "Comet" model of web applications. You can do the same thing in C++ and other languages.
While the PopFirst operation is blocking to wait for the next record, it occupies the thread on the server in the synchronous mode. Thus, it is recommended to run the server in the asynchronous mode where the blocking operation is done in the background. If you dare to use the synchronous mode, the number of worker threads should be more than the number of clients calling PopFirst.
Note that the records fetched by PopFirst are removed implicitly regardless of whether the consumer tasks succeed the operatios. In other words, there's no retrying feature for failed consumers. If you want such mechanisms, choose other message queueing services with more complex features or write you own service with Tkrzw.
Monitoring Records with CompareExchange
The CompareExchange method supports monitoring and notification mechanisms. For monitoring, you set the "retry_wait" parameter, which specifies the maximum time to wait before retrying the operation if the pre-condition is not met. For notification, you specify the "notify" parameter true to wake up waiting threads for retrying.
Let's say, you want to get the value of the record whose key is "abc" and remove it atomically. And, the operation should be done immediately after the record is set by another process. You write the following code. Note that the default constructor of std::string_view refers to nullptr as its data.
The following code adds the record of "abc" and notify it to wake up waiting threads for retrying.
CompareExchange can substitute any database operation including Get, Set, and Remove. If you dont't want a update, set DBM::ANY_DATA as the post-condition. If you overwrite an existing record, you set DBM::ANY_DATA as the pre-condition. Therefore, monitoring and notification can be combined with various kinds of database operations. Even retrying and notification can be done at the same time.
Let's confirm how moniring and notification work with CompareExchange by using the command line utility. First, run the server.
On another terminal, run the following command, which works as the consumer. It tries to get the value of the record whose key is "abc" and remove it atomically. The "--compex" option is to use CompareExchange and specifies the key of the record to monitor. The "--notry" option specifies the maximum waiting time in seconds for retrying.
The above command blocks because there's no record of the key. During the waiting period, open another terminal and run the following command, which works as the producer. It adds the record whose key is "abc" and value is "hello". It also notifies the update to wake up waiting threads for retrying.
Immediately after the update is done by the producer, the block of the consumer ends and the value "hello" of the record is printed.
The producer command set "null" (default std::string_view) as the pre-condition. Thus, if there's an existing record of the key "abc", the operation fails. If you add the "--retry" option to the command, it retries to add the record until it succeeds or the deadline comes. This realizes a pseudo-queue in the namespace of the key. If you add the values "hop", "step", and "jump" for the key "abc" immediately after each is cosumed, you run the following command.
Then, the consumer should do notification to wake up the retrying producer thread.
SSL Authentication and Encryption
gRPC supports networking over SSL to authenticate peers and protect their communications by encryption. To enable SSL, each of the server and clients uses three files: the private key, the certificate of itself, and the certificate of the root CA. With SSL enabled, the server accepts the client only if the certificate sent from the client is verified by the root CA certificate specified on the server. Data transmitted between the server and the client are encrypted by the public key of the sender and decrypted by the private key of the receiver. Because the third party cannot see the encrypted communication, you can use the database service over public networks such as Internet.
To enable SSL on the server side, specify "--auth" option which leads an expression in "ssl:key=...,cert=...,root=..." forat to specify the paths of the three PEM files for the server private key, the certificate of the server, and the certificate of the root CA. Note that the CN property of the server certificate must match the actual host name (FQDN) of the server.
To enable SSL on the client utility, specify "--auth" option as with the server, to specify the client private key, the certificate of the client, and the certificate of the root CA.
The same thing can be said with the Connect method of the client API. Other languages than C++ also support the authentication option in the same format.
Password authentication is discouraged these days because of relatively high risk of leaking. If you want to limit clients only to the trustable ones, you should prepare certifications for them by your own CA with OpenSSL etc. You install the certification files to the trustable clients and set the CA certificate to the server.
Asynchronous Replication
Overview
High availability is a concept to continue the service without any downtime even if some troubles including hardware failures happen. Data replication is to store the same data on multiple machines, which is a common way to achieve high availability. Tkrzw-RPC supports asynchronous replication, where data replication is done asynchrnounsly without waiting for the update data to reach a replica server. It means that updates are lost if the server dies before the replica server replicates the updates. Meanwhile, performance of asynchronous replication is much better than synchronous replication because the latency of each update query doesn't include the latency of the response of the replica server.
We call the server to which update queries are issued, as "master". We call another server which monitors the updates on the master and replicate them, as "slave". The master has to store update logs locally, which is realized by the message queue feature of the core library of Tkrzw. The slave calls the "Replicate" RPC call to the master to fetch the update logs since a timestamp. The slave applies the fetched updates on its own databases so that the databases on the master and the databases on the slave are synchronized. The slave manaages the last timestamp of the fetched updates and use it to resume replication effectively.
As the databases on the master and the slave have the same content with only a slight delay, clients can retrieve data from either of them. You can set up two or more slaves for load balancing of retrieval queries. Updating queries must be called only to the master for consistency.
If the master dies, one of the slaves is promoted as the master. The other slaves, if any, follows the new master. If a slave dies, a new slave is added to keep high availability. Usually, a slave is set up with a backup database and then the content is synchronized to the latest state by fetching updates since the timestamp of the backup database. Usually, the slave is also configured to stores update logs so that it can be promoted as the master anytime when the original master dies.
Replication Client
Let's set up a master server and monitor the updates with a client utility. To set up the master, you have to specify the server ID and the prefix of the update log files.
If you use the synchronous mode by omitting "--async", the number of threads should be increased by the estimated number of slaves because one slave dominates one thread. In the asynchronous mode, replication sessions and other sessions share the same threads.
On another terminal, call update queries to the master. The master database comes to have two records "one" and "two". The two correspoinding update logs are stored in the update log file.
Check the content of the update log file. This command doesn't finish but waits for the next updates. Keep it unfinished.
On another terminal, send more update queries. Two new update logs appear on the above terminal. Then, finish the command on the above terminal by inputting Ctrl-C.
You can retrieve update logs and apply them on local databases. You specify the paths of the database files where update logs are applied. If the file doesn't exist, a new database is created. Also, you create the timestamp file by the "--ts_file" option to store the last timestamp. The stored value is used to as the minimum timestamp when the command is executed again. The "--ts_skew" option specifies the skew applied to the minimum timestamp to absorb the time difference between servers. You specify "--wait 0" and "--items 0", not to wait for the future updates. After that, check the content of the updated database.
In this way, you can make backup databases. If the server manages multiple databases, you should specify the same number of databases. You can update the backup databases by the same command. The timestamp stored in the timestamp file is used to get update logs since the timestamp.
Master-slave Topology
The simplest topology of data replication is composed of one master and one slave. Let's reuse the master server and the backup file for this excecise. To set up a slave server, copy the backup database files and the timestamp file. Then, run the server command specifying the address of the master server. As we do this exercise on a single machine and the master uses the port 1978, we use the port 1979 for the slave server. The slave also have a server ID which must be unique within your service architecture. Yo should configure update logs for the slave server too because the slave can be treated as the master in the future.
The update logs between the current time and the timestamp stored in the timestamp file are retrieved and applied to the database automatically. And, updates from now on are also synchronized on the spot. To confirm it, put some records onto the master server and the list up the content of the slave database.
Let's assume that the master dies and we promote the slave as the new master. Stop the master and then add a new slave. This time, we don't specify the address of the new master but set it later.
At this point, replication is not started. Thus, the records "five" and "six" don't exist.
Let's start the replication by setting the master address to the slave server dynamically. In this case, "localhost:1980" is the new slave and "localhost:1979" is the new master. "--ts_skew -10000" means that replication starts from a point 10 seconds before the current timestamp.
Confirm that the content has been synchronized automatically.
While updating queries must be done on the master, retrieval (non-updating) queries can be done on either the master or the slave. Moreover, if retrieval queries are much more than updating queries, using multiple slave servers and doing load balancing is a good way to improve the throughput.
The timestamp of replication must always be given by the master. The timestamp means a breakpoint from which the replication resumes on the master side. The content of the timestamp file is the last timestamp given by the master, which is independent of the local clock. Thus, even if the clock of the slave is skewed, replication works properly. However, if the master dies and a slave becomes the new master, the timestamp is evaluated on the timeline of the new master machine, which is diffeerent from the old master. Therefore, you should set "--repl_ts_skew" option of tkrzwr_server or "--ts_skew" option of tkrzw_dbm_remote_util with a netagive value to absorb a possible time leap. Note that update logs are idempotent so duplicated application is acceptable.
Dual Masters Topology
Whereas the master-slave topology provides the basics of high availability, it still has downtime against update operations. Between the time when the master dies and the time when the new master is set up and announced to the clients, updating operations cannot be done. One workaround is to treat a pre-determined "prime" slave as the acting master. If clients cannot access the master, they can call updating operations to the acting master. However, it causes a potential problem of inconsistency. For some reasons, even if the master is alive, some clients can be unable to access the master so they update the acting master. Then, if the acting master doesn't become the actual master, updates to the acting master are lost.
Dual masters topology can solve this issue. If the master replicates the prime slave, accidental updates to the prime slave are fed back to the master. In the context of the dual master topology, the actual master is called "active master" and the prime slave is called "standby master". To minimize possible inconsistency, clients should usually updates the active master. Only if they cannot reach the active master, they are allowed to update the standby master. The point is, each client can determine fallback measures by itself. It avoids downtime completely at the risk of potential inconsistency which happens when the same record is updated on the active master and the standby master simultaneously.
Let's set up dual masters. Before that, stop the servers of the previous exercise and remove related files.
The server #1 at the port 1978 is the active master and the server #2 at the port 1979 is the standby master. They replicates each other. Note that the server ID must be unique.
Confirm that updates to one server are propagated to the other timely.
Let's simulate that the active master dies by inputting Ctrl-C on the terminal. The server #2 is concidered as the new active master. Then, set up the server #3 at the port 1980 as the new standby master. This time, we specify a non-existing database file so a new empty database is created.
Confirm that the new database has been synchronized automatically.
Modify the master address of the new active master to the new standby master.
Confirm that the mutal replication is working.
You might be sceptical whether each and every update on one server is consistently replicated to the other server. Then, let's do a load test where 10 threads updates the active master server simultaneously 100 million times in total.
Confirm that the number of records are the same as each other.
If you add more slaves, usually they specify the active master as their master. It minimizes the delay of propagation of updates. And, the operation to modify the master setting is simple. Although it is possible for the slaves to follow the standby master, the delay is larger and the switching operation is more complex.
You use multiple machines in the actual production service. Moreover, you can shard one logical database into multiple shards each of which is composed of dual masters. If the number of servers in operation is more than ten, you should automate the system configuration and server setups, combined with a monitoring system.
Managing Backup Files
Basically, backup files should be stored on a different host from the server hosts, for the sake of fault tolerance. Let's call the host to store backup files as "storage host". When you make a backup file for the first time, you should make an empty database file with proper tuning parameters, on the storage host. For example, if you expect the number of records is 10 million, the number of hash buckets of HashDBM should be about 20 million.
You can also make a backup file on a database server, which can be a master or a slave. Then, copy the file to the storage host.
Then, you should copy the latest backup file as another file named after the date suffix like "20200211023001".
Periodically, you update the latest backup file by replicating updates from a database server, which should be the master server to minimize the delay. The timestamp at which the replication resumes is specified by the timestamp file set by the "--ts_file" option. If the timestamp file doesn't exist or is empty, the "--ts_from_dbm" option extracts the timestamp from the database file. If the both didn't work, replication would start from the oldest update log on the server.
You can also make a full backup by the above "tkrzw_dbm_remote_util sync" command. However, making a full backup blocks the operations of the server for seconds or minutes. Thus, differential backup by replication is usually preferrable.
Sometimes, you can also rebuild the backup file to optimize for time/space efficiency.
After that, you should copy the latest backup file as a file with the date suffix in the same way described above. You remove oldest database files if the storage usage is close to the capacity.
You can check the timestamp value by reading the content of the time stamp file. Note that the value is in milliseconds and calculated by the clock on the mater server.
In the above procedures, you don't save the timestamp file for each backup file. However, you can use the timestamp embedded in the backup database file. Note that the timestamp in the database file is in seconds so you should multiply it by 1000 to specify as a timestamp for Tkrzw-RPC. This information is used by the "--ts_from_dbm" option of "tkrzw_dbm_remote_util" and "--repl_ts_from_dbm" of "tkrzw_server".
Note that the timestamp of the database represents the time when the database is closed by a writer process last time. Thus, if you have opened the backup database in the writable mode, this method is not feasible. Anyway, you should never update the backup files with date suffixes.
Remove Old Update Log Files
Update log files accumulate on each server. By default, when the file size exceeds 1GB, a new file with a new ID suffix is created. After you make backup database files, you can remove update log files which are older than the backup files. You can check the timestamp of each update log files by the following command. It shows the file path, the ID, the timestamp of the last record, the oldness of the timestamp, and the filesize.
You can remove update log files except for the latest one while the server is running. If you want to remove files whose timestamp is older than 3 days ago, you run the following command. Don't forget "-" of "-3D".
In theory, after you make a full backup or a differential backup, you can remove all update whose timestamp is older than the timestamp of the backup file. However, retaining update logs for a decent period is a good practice for traceability. It is also a good practice to copy the old update log files to the storage host before removing them.
Frequently Asked Questions
- Q: Can I write a client library for a language?
- A: Yes, please. Fork the tkrzw_rpc.proto file to add options for the package name etc.
- Q: Do you plan to support Windows?
- A: Currently, no. It's a matter of priority.
License
Tkrzw-RPC is written mainly by Mikio Hirabayashi, copyrighted by Google LLC, and distributed under the Apache license 2.0. See the COPYING file in the package for detail.