Tkrzw
Classes | Public Types | Public Member Functions | Static Public Member Functions | List of all members
tkrzw::MessageQueue Class Referencefinal

Message queue on the file stream. More...

#include <tkrzw_message_queue.h>

Classes

class  Reader
 Messsage reader. More...
 

Public Types

enum  OpenOption : int32_t {
  OPEN_DEFAULT = 0 , OPEN_TRUNCATE = 1 << 0 , OPEN_SYNC_HARD = 1 << 1 , OPEN_READ_ONLY = 1 << 2 ,
  OPEN_IGNORE_BROKEN = 1 << 3
}
 Enumeration of options for Open. More...
 

Public Member Functions

 MessageQueue ()
 Default constructor. More...
 
 ~MessageQueue ()
 Destructor. More...
 
Status Open (const std::string &prefix, int64_t max_file_size, int32_t options=OPEN_DEFAULT)
 Opens the queue files to store the messages. More...
 
Status Close ()
 Closes the queue files. More...
 
Status CancelReaders ()
 Cancels operations of the current readers. More...
 
Status Write (int64_t timestamp, std::string_view message)
 Writes a message. More...
 
Status UpdateTimestamp (int64_t timestamp)
 Updates the timestamp without writing a message. More...
 
Status Synchronize (bool hard)
 Synchronizes the metadata and content to the file system. More...
 
int64_t GetTimestamp ()
 Gets the latest timestamp. More...
 
std::unique_ptr< ReaderMakeReader (int64_t min_timestamp)
 Makes a message reader. More...
 

Static Public Member Functions

static Status FindFiles (const std::string &prefix, std::vector< std::string > *paths)
 Finds files matching the given prefix and the date format suffix. More...
 
static uint64_t GetFileID (const std::string &path)
 Gets the ID number of a message file path. More...
 
static Status ReadFileMetadata (const std::string &path, int64_t *file_id, int64_t *timestamp, int64_t *file_size)
 Reads the metadata of a message file path. More...
 
static Status RemoveOldFiles (const std::string &prefix, int64_t threshold, bool exclude_latest=false)
 Remove files whose newest message is older than a threshold. More...
 
static Status ReadNextMessage (File *file, int64_t *file_offset, int64_t *timestamp, std::string *message, int64_t min_timestamp=0)
 Reads the next message from a file. More...
 
static int64_t ParseTimestamp (std::string_view expr, int64_t base_time)
 Parses a timestamp expression. More...
 

Detailed Description

Message queue on the file stream.

Member Enumeration Documentation

◆ OpenOption

Enumeration of options for Open.

Enumerator
OPEN_DEFAULT 

The default behavior.

OPEN_TRUNCATE 

To truncate the file.

OPEN_SYNC_HARD 

To do physical synchronization for each output.

OPEN_READ_ONLY 

To not write anything.

OPEN_IGNORE_BROKEN 

To ignore broken records.

Constructor & Destructor Documentation

◆ MessageQueue()

tkrzw::MessageQueue::MessageQueue ( )

Default constructor.

◆ ~MessageQueue()

tkrzw::MessageQueue::~MessageQueue ( )

Destructor.

Member Function Documentation

◆ Open()

Status tkrzw::MessageQueue::Open ( const std::string &  prefix,
int64_t  max_file_size,
int32_t  options = OPEN_DEFAULT 
)

Opens the queue files to store the messages.

Parameters
prefixThe prefix for the file names. The actual name of each file has the suffix of ten digits of the decimal ID, like "0000000000" and "0000123456".
max_file_sizeThe maximum size of each file. When the actual file exceeds the limit, a new file is created and new messages are written into it.
optionsBit-sum options of MessageQueue::OpenOption enums.
Returns
The result status.

◆ Close()

Status tkrzw::MessageQueue::Close ( )

Closes the queue files.

Returns
The result status.

◆ CancelReaders()

Status tkrzw::MessageQueue::CancelReaders ( )

Cancels operations of the current readers.

Returns
The result status.

◆ Write()

Status tkrzw::MessageQueue::Write ( int64_t  timestamp,
std::string_view  message 
)

Writes a message.

Parameters
timestampThe timestamp in milliseconds of the massage. If it is negative, the current wall time is specified.
messageThe message data.
Returns
The result status.

◆ UpdateTimestamp()

Status tkrzw::MessageQueue::UpdateTimestamp ( int64_t  timestamp)

Updates the timestamp without writing a message.

Parameters
timestampThe timestamp in milliseconds of the massage. If it is negative, the current wall time is specified.

◆ Synchronize()

Status tkrzw::MessageQueue::Synchronize ( bool  hard)

Synchronizes the metadata and content to the file system.

Parameters
hardTrue to do physical synchronization with the hardware or false to do only logical synchronization with the file system.
Returns
The result status.

The metadata of the current file size is not written to the file until this method is called. When an external process reads the latest file, it reads the region only to the offset of the metadata file size. If OPEN_SYNC_HARD is set to the options of the Open method, synchronization is done implicitly for each write.

◆ GetTimestamp()

int64_t tkrzw::MessageQueue::GetTimestamp ( )

Gets the latest timestamp.

Returns
The latest timestamp, or -1 on failure.

◆ MakeReader()

std::unique_ptr<Reader> tkrzw::MessageQueue::MakeReader ( int64_t  min_timestamp)

Makes a message reader.

Parameters
min_timestampThe minimum timestamp in milliseconds of messages to read.
Returns
The message reader.

◆ FindFiles()

static Status tkrzw::MessageQueue::FindFiles ( const std::string &  prefix,
std::vector< std::string > *  paths 
)
static

Finds files matching the given prefix and the date format suffix.

Parameters
prefixThe prefix for the file names.
pathsThe pointer to a vector object to store the result paths.
Returns
The result status. Even if there's no matching file, SUCCESS is returned.

The matched paths are sorted in ascending order of the file names.

◆ GetFileID()

static uint64_t tkrzw::MessageQueue::GetFileID ( const std::string &  path)
static

Gets the ID number of a message file path.

Parameters
pathThe path of the message file.
Returns
The ID number of the message file.

◆ ReadFileMetadata()

static Status tkrzw::MessageQueue::ReadFileMetadata ( const std::string &  path,
int64_t *  file_id,
int64_t *  timestamp,
int64_t *  file_size 
)
static

Reads the metadata of a message file path.

Parameters
pathThe path of the message file.
file_idThe pointer to a variable to store the file ID.
timestampThe pointer to a variable to store the timestamp in milliseconds. The timestamp represents the newest record in the file.
file_sizeThe pointer to a variable to store the file size.
Returns
The result status.

◆ RemoveOldFiles()

static Status tkrzw::MessageQueue::RemoveOldFiles ( const std::string &  prefix,
int64_t  threshold,
bool  exclude_latest = false 
)
static

Remove files whose newest message is older than a threshold.

Parameters
prefixThe prefix for the file names.
thresholdThe threshold timestamp in milliseconds.
exclude_latestIf true, the latest file is never removed.
Returns
The result status. Even if no files are matched, it returns success.

◆ ReadNextMessage()

static Status tkrzw::MessageQueue::ReadNextMessage ( File file,
int64_t *  file_offset,
int64_t *  timestamp,
std::string *  message,
int64_t  min_timestamp = 0 
)
static

Reads the next message from a file.

Parameters
fileThe file object to read from.
file_offsetThe pointer to the variable containing the offset to read and to store the offset of the next record. If the initial value can be zero to read the first record.
timestampThe pointer to a variable to store the timestamp in milliseconds of the message.
min_timestampThe minimum timestamp in milliseconds to fill the message data.
messageThe pointer to a string object to store the msssage data.
Returns
The result status. If the remaining part from the offset is filled with null codes, CANCELED_ERROR is returned.

◆ ParseTimestamp()

static int64_t tkrzw::MessageQueue::ParseTimestamp ( std::string_view  expr,
int64_t  base_time 
)
static

Parses a timestamp expression.

Parameters
exprThe timestamp expression. If it begins with "+" or "-", the value is evaluated as relative value to the base_time. If it ends with "D" or "d", the unit is the day. If it ends with "H" or "h", the unit is the hour. If it ends with "M" or "m", the unit is the minute. If it ends with "s" or "S", the unit is the second. Otherwise, the unit is the millisecond.
base_timeThe base time in milliseconds.
Returns
The result timestamp in milliseconds.