Documentation
¶
Index ¶
- func BatchRecords(b kmsg.RecordBatch) ([]kmsg.Record, error)
- type ACL
- type Cluster
- func (c *Cluster) AddNode(nodeID int32, port int) (int32, int, error)
- func (c *Cluster) ApplyRetention()
- func (c *Cluster) Close()
- func (c *Cluster) Compact()
- func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool))
- func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool))
- func (c *Cluster) CoordinatorFor(key string) int32
- func (c *Cluster) CurrentNode() int32
- func (c *Cluster) DropControl()
- func (c *Cluster) KeepControl()
- func (c *Cluster) LeaderFor(topic string, partition int32) int32
- func (c *Cluster) ListenAddrs() []string
- func (c *Cluster) MoveTopicPartition(topic string, partition, nodeID int32) error
- func (c *Cluster) PartitionInfo(topic string, partition int32) *PartitionInfo
- func (c *Cluster) PartitionInfos(topic string) []*PartitionInfo
- func (c *Cluster) RehashCoordinators()
- func (c *Cluster) RemoveNode(nodeID int32) error
- func (c *Cluster) SetFollowers(topic string, partition int32, followers []int32)
- func (c *Cluster) ShufflePartitionLeaders()
- func (c *Cluster) SleepControl(wakeup func())
- func (c *Cluster) TopicIDInfo(id [16]byte) *TopicInfo
- func (c *Cluster) TopicInfo(topic string) *TopicInfo
- type LogLevel
- type Logger
- type Opt
- func AllowAutoTopicCreation() Opt
- func BrokerConfigs(configs map[string]string) Opt
- func ClusterID(clusterID string) Opt
- func DefaultNumPartitions(n int) Opt
- func EnableACLs() Opt
- func EnableSASL() Opt
- func GroupMaxSessionTimeout(d time.Duration) Opt
- func GroupMinSessionTimeout(d time.Duration) Opt
- func ListenFn(fn func(network, address string) (net.Listener, error)) Opt
- func MaxVersions(v *kversion.Versions) Opt
- func NumBrokers(n int) Opt
- func Ports(ports ...int) Opt
- func SeedTopics(partitions int32, ts ...string) Opt
- func SleepOutOfOrder() Opt
- func Superuser(method, user, pass string) Opt
- func TLS(c *tls.Config) Opt
- func User(method, user, pass string, acls ...ACL) Opt
- func WithLogger(logger Logger) Opt
- type PartitionInfo
- type TopicInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BatchRecords ¶
func BatchRecords(b kmsg.RecordBatch) ([]kmsg.Record, error)
BatchRecord returns the raw kmsg.Record's within a record batch, or an error if they could not be processed.
Types ¶
type ACL ¶
type ACL struct {
// Principal in "User:<name>" format. When used with User(), this field
// is ignored and forced to "User:<username>".
Principal string
// Resource type: kmsg.ACLResourceTypeTopic, Group, Cluster, TransactionalId
Resource kmsg.ACLResourceType
// Name of the resource (topic name, group name, "*" for cluster, etc.)
Name string
// Pattern type: kmsg.ACLResourcePatternTypeLiteral or Prefixed
Pattern kmsg.ACLResourcePatternType
// Operation: kmsg.ACLOperationRead, Write, Create, Delete, Alter, Describe, etc.
Operation kmsg.ACLOperation
// Allow true for ALLOW, false for DENY
Allow bool
// Host to allow/deny from, defaults to "*" if empty
Host string
}
ACL defines an ACL entry for seeding the cluster.
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster is a mock Kafka broker cluster.
func MustCluster ¶
MustCluster is like NewCluster, but panics on error.
func NewCluster ¶
NewCluster returns a new mocked Kafka cluster.
func (*Cluster) AddNode ¶
AddNode adds a node to the cluster. If nodeID is -1, the next node ID is used. If port is 0 or negative, a random port is chosen. This returns the added node ID and the port used, or an error if the node already exists or the port cannot be listened to.
func (*Cluster) ApplyRetention ¶
func (c *Cluster) ApplyRetention()
ApplyRetention enforces retention.ms and retention.bytes on all topics, removing batches that are expired or exceed the size limit.
func (*Cluster) Compact ¶
func (c *Cluster) Compact()
Compact triggers log compaction on all topics with cleanup.policy=compact. Records with duplicate keys are deduplicated, keeping only the latest value. Tombstones (nil value) older than delete.retention.ms are removed.
func (*Cluster) Control ¶
Control is a function to call on any client request the cluster handles. See Cluster.ControlKey for more details.
func (*Cluster) ControlKey ¶
ControlKey is a function to call on a specific request key that the cluster handles. If the key is -1, then the control function is run on all requests. For all possible keys, see kmsg.Key, for example kmsg.Produce.
If the control function returns true (handled), then either the response is written back to the client or, if the control function returns an error, the client connection is closed. If both returns are nil, then the cluster will loop continuing to read from the client and the client will likely have a read timeout at some point.
If the control function returns false (not handled), the next control function for this key runs. If no control function handles the request, the cluster processes it normally. This allows control functions that just observe or count requests without intercepting them.
Multiple control functions for the same key run in FIFO order (the order they were added).
Handling a request drops the control function from the cluster, meaning that a control function can only control *one* request. To keep the control function handling more requests, you can call KeepControl within your control function. Alternatively, if you want to just run some logic in your control function but then have the cluster handle the request as normal, you can call DropControl to drop a control function that was not handled.
It is safe to add new control functions within a control function.
Control functions are run serially unless you use SleepControl, multiple control functions are "in progress", and you run Cluster.Close. Closing a Cluster awakens all sleeping control functions.
func (*Cluster) CoordinatorFor ¶
CoordinatorFor returns the node ID of the group or transaction coordinator for the given key.
func (*Cluster) CurrentNode ¶
CurrentNode is solely valid from within a control function; it returns the broker id that the request was received by. If there's no request currently inflight, this returns -1.
func (*Cluster) DropControl ¶
func (c *Cluster) DropControl()
DropControl removes the current control function. This takes precedence over KeepControl, allowing you to keep a control function by default but forcefully drop it when a specific condition is met.
func (*Cluster) KeepControl ¶
func (c *Cluster) KeepControl()
KeepControl marks the currently running control function to be kept even if you handle the request and return true. This can be used to continuously control requests without needing to re-add control functions manually.
func (*Cluster) LeaderFor ¶
LeaderFor returns the node ID of the topic partition. If the partition does not exist, this returns -1.
func (*Cluster) ListenAddrs ¶
ListenAddrs returns the hostports that the cluster is listening on.
func (*Cluster) MoveTopicPartition ¶
MoveTopicPartition simulates the rebalancing of a partition to an alternative broker. This returns an error if the topic, partition, or node does not exit.
func (*Cluster) PartitionInfo ¶
func (c *Cluster) PartitionInfo(topic string, partition int32) *PartitionInfo
PartitionInfo returns information about a partition if it exists.
func (*Cluster) PartitionInfos ¶
func (c *Cluster) PartitionInfos(topic string) []*PartitionInfo
PartitionInfos returns information about all partitions in a topic, if it exists. The partitions are returned in sorted partition order, with partition 0 at index 0, partition 1 at index 1, etc.
func (*Cluster) RehashCoordinators ¶
func (c *Cluster) RehashCoordinators()
RehashCoordinators simulates group and transacational ID coordinators moving around. All group and transactional IDs are rekeyed. This forces clients to reload coordinators.
func (*Cluster) RemoveNode ¶
RemoveNode removes a ndoe from the cluster. This returns an error if the node does not exist.
func (*Cluster) SetFollowers ¶
SetFollowers sets the node IDs of brokers that can also serve fetch requests for a partition. Setting followers to an empty or nil slice reverts to the default of only the leader being able to serve fetch requests.
func (*Cluster) ShufflePartitionLeaders ¶
func (c *Cluster) ShufflePartitionLeaders()
ShufflePartitionLeaders simulates a leader election for all partitions: all partitions have a randomly selected new leader and their internal epochs are bumped.
func (*Cluster) SleepControl ¶
func (c *Cluster) SleepControl(wakeup func())
SleepControl sleeps the current control function until wakeup returns. This yields to run any other connection.
Note that per protocol, requests on the same connection must be replied to in order. Many clients write multiple requests to the same connection, so if you sleep until a different request runs, you may sleep forever -- you must know the semantics of your client to know whether requests run on different connections (or, ensure you are writing to different brokers).
For example, franz-go uses a dedicated connection for:
- produce requests
- fetch requests
- join&sync requests
- requests with a Timeout field
- all other request
So, for franz-go, there are up to five separate connections depending on what you are doing.
You can run SleepControl multiple times in the same control function. If you sleep a request you are controlling, and another request of the same key comes in, it will run the same control function and may also sleep (i.e., you must have logic if you want to avoid sleeping on the same request).
func (*Cluster) TopicIDInfo ¶
TopicIDInfo returns the topic for a topic ID if the topic ID exists.
type LogLevel ¶
type LogLevel int8
LogLevel designates which level the logger should log at.
const ( // LogLevelNone disables logging. LogLevelNone LogLevel = iota // LogLevelError logs all errors. Generally, these should not happen. LogLevelError // LogLevelWarn logs all warnings, such as request failures. LogLevelWarn // LogLevelInfo logs informational messages, such as requests. This is // usually the default log level. LogLevelInfo // LogLevelDebug logs verbose information, and is usually not used in // production. LogLevelDebug )
type Opt ¶
type Opt interface {
// contains filtered or unexported methods
}
Opt is an option to configure a client.
func AllowAutoTopicCreation ¶
func AllowAutoTopicCreation() Opt
AllowAutoTopicCreation allows metadata requests to create topics if the metadata request has its AllowAutoTopicCreation field set to true.
func BrokerConfigs ¶
BrokerConfigs sets initial broker-level dynamic configs. Empty values are treated as deletes (reset to default). Configs most relevant for testing:
- group.consumer.heartbeat.interval.ms - heartbeat interval for KIP-848 consumer groups (default 5000, lowered to 100 in test binaries)
- group.consumer.session.timeout.ms - session timeout for KIP-848 consumer groups (default 45000)
- message.max.bytes - max produce batch size (default 1048588)
- transaction.max.timeout.ms - max transaction timeout (default 900000)
- max.incremental.fetch.session.cache.slots - max fetch sessions per broker (default 1000)
Other accepted configs include compression.type, default.replication.factor, min.insync.replicas, log.retention.bytes, log.retention.ms, and log.message.timestamp.type.
func DefaultNumPartitions ¶
DefaultNumPartitions sets the number of partitions to create by default for auto created topics / CreateTopics with -1 partitions, overriding the default of 10.
func EnableACLs ¶
func EnableACLs() Opt
EnableACLs enables ACL checking. When enabled, all requests are checked against ACLs. By default, no ACLs exist, so all requests will be denied unless you configure superusers (via Superuser option) and then add ACLs via CreateACLs as that user.
func EnableSASL ¶
func EnableSASL() Opt
EnableSASL enables SASL authentication for the cluster. If you do not configure a bootstrap user / pass, the default superuser is "admin" / "admin" with the SCRAM-SHA-256 SASL mechanisms.
func GroupMaxSessionTimeout ¶
GroupMaxSessionTimeout sets the cluster's maximum session timeout allowed for groups, overriding the default 5 minutes.
func GroupMinSessionTimeout ¶
GroupMinSessionTimeout sets the cluster's minimum session timeout allowed for groups, overriding the default 6 seconds.
func ListenFn ¶
ListenFn sets the listerner function to use, overriding net.Listen
func MaxVersions ¶
MaxVersions sets the maximum API versions the cluster will advertise and accept. This can be used to simulate older Kafka versions. For each request key, the cluster will use the minimum of its implemented max version and the version specified in the provided Versions. If a key is not present in the provided Versions, requests for that key will be rejected.
func NumBrokers ¶
NumBrokers sets the number of brokers to start in the fake cluster.
func Ports ¶
Ports sets the ports to listen on, overriding randomly choosing NumBrokers amount of ports.
func SeedTopics ¶
SeedTopics provides topics to create by default in the cluster. Each topic will use the given partitions and use the default internal replication factor. If you use a non-positive number for partitions, DefaultNumPartitions is used. This option can be provided multiple times if you want to seed topics with different partition counts. If a topic is provided in multiple options, the last specification wins.
func SleepOutOfOrder ¶
func SleepOutOfOrder() Opt
SleepOutOfOrder allows functions to be handled out of order when control functions are sleeping. The functions are be handled internally out of order, but responses still wait for the sleeping requests to finish. This can be used to set up complicated chains of control where functions only advance when you know another request is actively being handled.
func Superuser ¶
Superuser seeds the cluster with a superuser. The method must be either PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Note that PLAIN superusers cannot be deleted. SCRAM superusers can be modified with AlterUserScramCredentials. If you delete all SASL users, the kfake cluster will be unusable.
Superusers bypass all ACL checks when ACLs are enabled.
type PartitionInfo ¶
type PartitionInfo struct {
Partition int32 // Partition is the partition this info is for.
HighWatermark int64 // HighWatermark is the latest offset present in the partition.
LastStableOffset int64 // LastStableOffset is the last stable offset.
LogStartOffset int64 // LogStartOffsets is the first offset present in the partition.
Epoch int32 // Epoch is the current "epoch" of the partition -- how many times the partition transferred leaders.
MaxTimestamp int64 // MaxTimestamp is the current max timestamp across all batches.
NumBytes int64 // NumBytes is the current amount of data stored in the partition.
Leader int32 // Leader is the current leader of the partition.
}
PartitionInfo contains snapshot-in-time metadata about an existing partition.
type TopicInfo ¶
type TopicInfo struct {
Topic string // Topic is the topic this info is for.
TopicID [16]byte // TopicID is the UUID of the topic.
NumReplicas int // NumReplicas is the replication factor for all partitions in this topic.
Configs map[string]*string // Configs contains all configuration values specified for this topic.
}
TopicInfo contains snapshot-in-time metadata about an existing topic.
Source Files
¶
- 00_produce.go
- 01_fetch.go
- 02_list_offsets.go
- 03_metadata.go
- 08_offset_commit.go
- 09_offset_fetch.go
- 10_find_coordinator.go
- 11_join_group.go
- 12_heartbeat.go
- 13_leave_group.go
- 14_sync_group.go
- 15_describe_groups.go
- 16_list_groups.go
- 17_sasl_handshake.go
- 18_api_versions.go
- 19_create_topics.go
- 20_delete_topics.go
- 21_delete_records.go
- 22_init_producer_id.go
- 23_offset_for_leader_epoch.go
- 24_add_partitions_to_txn.go
- 25_add_offsets_to_txn.go
- 26_end_txn.go
- 28_txn_offset_commit.go
- 29_describe_acls.go
- 30_create_acls.go
- 31_delete_acls.go
- 32_describe_configs.go
- 33_alter_configs.go
- 34_alter_replica_log_dirs.go
- 35_describe_log_dirs.go
- 36_sasl_authenticate.go
- 37_create_partitions.go
- 42_delete_groups.go
- 43_elect_leaders.go
- 44_incremental_alter_configs.go
- 45_alter_partition_assignments.go
- 46_list_partition_reassignments.go
- 47_offset_delete.go
- 48_describe_client_quotas.go
- 49_alter_client_quotas.go
- 50_describe_user_scram_credentials.go
- 51_alter_user_scram_credentials.go
- 60_describe_cluster.go
- 61_describe_producers.go
- 65_describe_transactions.go
- 66_list_transactions.go
- 68_consumer_group_heartbeat.go
- 69_consumer_group_describe.go
- 71_get_telemetry_subscriptions.go
- 72_push_telemetry.go
- acl.go
- client_conn.go
- cluster.go
- config.go
- data.go
- groups.go
- logger.go
- misc.go
- sasl.go
- topic_partition.go
- txns.go