I recently had to design an secure Apache Kafka cluster that was secured with Kerberos for user/service authentication, SSL/TLS for encryption of communications and ACLs. This wasn’t exactly straightforward, especially the Kerberos part, as I was hoping it was going to be. This blog post documents how I designed and tested the cluster – hopefully this will be of use to others.
What is Kafka?
To quote the project website (https://kafka.apache.org/):
Apache Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
You may also see the following definition:
Apache Kafka is publish-subscribe messaging rethought as a distributed commit log
Kafka Components
Kafa utilises Apache ZooKeeper for cluster co-ordination services; specifically it is used to store metadata about the Kafka cluster and consumer client information. In ZooKeeper terminology, a cluster of ZooKeeper nodes is known as an ensemble. A ZooKeeper ensemble must contain an odd number of nodes in order to avoid split-brain in the event of a network partition.
Kafka nodes themselves are referred to as brokers. Brokers receive messages from Producers, assigns offsets to the messages and commits the messages to disk storage. Brokers also service consumers, responding with the requested messages from a given offset.
In Kafka messages are categorised into topics. Topics can be further split into partitions that can reside on different servers. The default number of partitions for the Kafka cluster. Partitioning provides fault-tolerance as well as the ability to scale throughput.
It is important to note that Kafka only provides time ordering of messages within a single partition. Time ordering is not provided across an entire topic that has multiple partitions.
A Producer application reads data from a source system, then write the data to a Kafka topic. A Consumer application then subscribes to one or more topics and read messages from the topic partitions in the order they were produced.
Consumers in the same consumer group will read messages from the same topic but different partitions thereby increasing read throughput. Consumers keep track of offsets in either Kafka itself or ZooKeeper, which allows consumers to recover after a failure and continue from the last message read.
MirrorMaker is a tool that can be used to replicate data to a remote cluster. MirrorMaker essentially implements a consumer that reads data from a Kafka topic in one cluster and a producer to republish the messages to a topic in a different cluster. If high availability is required for this component, two servers can run one instance of MirrorMaker each configured in the same consumer group. Each instance will get half of the traffic mirroring to the other data centre since they have consumers in the same group. If a MirrorMaker becomes unavailable, the other instance will automatically pick up the load.
Design Considerations
ZooKeeper
A cluster of Zookeeper nodes, known as an ensemble, is used by Kafka for cluster coordination and to store metadata about brokers, topics and partitions. If you have a Hadoop cluster then you will already have ZooKeeper as many Hadoop ecosystem tools utilise it. However, there are a number of reasons why it may not be a good idea to use the ZooKeeper ensemble used by your Hadoop cluster, including:
Using the Hadoop Zookeeper ensemble introduces an unnecessary coupling between Hadoop and Kafka.
- Sharing an ensemble results in maintenance operations on a Hadoop cluster having the potential to impact Kafka.
- Kafka can be negatively impacted by Zookeeper latency and timeouts; using the Hadoop Zookeeper ensemble increases the risk of increased latency and timeouts.
In order to avoid split-brain scenarios an odd number of nodes in the Zookeeper cluster are required. The minimum number is 3 nodes, which would allow for one failure, 5 nodes are deployed as this allows for failures and maintenance activities. However, 5 nodes increase write latency as more nodes are involved. If you plan to have multiple clusters across different data centres, it would be advisable to use separate ZooKeeper clusters -as per the recommendations in the ZooKeeper documentation:
“Forming a ZK ensemble between two datacenters is a problematic endeavour as the high variance in latency between the datacenters could lead to false positive failure detection and partitioning. However, if the ensemble runs entirely in one datacenter, and the second datacenter runs only Observers, partitions aren’t problematic as the ensemble remains connected. Clients of the Observers may still see and issue proposals”
Data formats
Kafka stores and transfers messages as a byte array and so the structure of the message is transparent to Kafka brokers. However, it is recommended that structure is imposed on messages as a consistent data format allows the writing and reading of messages to be decoupled. At present Apache Avro, a serialisation framework developed for Hadoop, is the most widely used with Kafka as it provides a number of features including:
- A compact serialisation format
- Separation of schemas from the message payload
- Strong typing and schema evolution
Delivery Semantics
It should be noted that the default behaviour of Kafka provides at-least-once delivery semantics (one or more deliveries of the message). In this case Kafka guarantees that no messages are lost but duplicates are possible.
By default the Kafka consumer auto-commits offsets (enable.auto.commit is set to true), whereby the Consumer will commit at intervals defined by auto.commit.interval.ms property (default is 60 seconds). The probability of duplicate messages can be reduced by lowering the auto.commit.interval.ms property and at the same time using consumer.commitSync to also commit offsets at intervals that make sense to the application.
Example of how to build the cluster (development environment)
To simplify things I’m going to show the steps for a 3 node development cluster where ZooKeeper and Kafka brokers are running on the same servers but you can easily extend this to an environment where they reside on separate infrastructure. The instructions assume the servers are domain joined (Active Directory).
ZooKeeper Install
Ensure the server has been domain joined and the krb5-workstation package is installed
Install JDK 1.8 u51
export JDK_VERSION="jdk-8u51-linux-x64"
export JDK_RPM="$JDK_VERSION.rpm"
wget -O /tmp/$JDK_RPM --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u51-b16/$JDK_RPM
rpm -ivh /tmp/$JDK_RPM
Install the Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip"
unzip jce_policy-8.zip
mv /usr/java/default/jre/lib/security/local_policy.jar /usr/java/default/jre/lib/security/local_policy.jar.backup
mv /usr/java/default/jre/lib/security/US_export_policy.jar /usr/java/default/jre/lib/security/US_export_policy.jar.backup
mv UnlimitedJCEPolicyJDK8/*.jar /usr/java/default/jre/lib/security/
rm -f jce_policy-8.zip
Download ZooKeeper 3.4.6 from http://zookeeper.apache.org/releases.html#download
Extract the gzip file
tar -zxf zookeeper-3.4.6.tar.gz
Move the ZooKeeper directory to /usr/local and create a symlink
Creating a symlink makes it easier to switch between different ZooKeeper versions.
mv zookeeper-3.4.6 /usr/local/
ln -s /usr/local/zookeeper-3.4.6 /usr/local/zookeeper
Create a user account under which ZooKeeper will run
export ZOOKEEPER_USER='zookeeper'
export ZOOKEEPER_GROUP='zookeeper'
/usr/sbin/groupadd -r $ZOOKEEPER_GROUP &>/dev/null
id $ZOOKEEPER_USER &>/dev/null || useradd -s /sbin/nologin -r -M $ZOOKEEPER_USER -g $ZOOKEEPER_GROUP -c "Zookeeper Server"
usermod $ZOOKEEPER_USER -d /zookeeper
Note the /zookeeper directory should be a mount on a separate disk.
Create a file named myid under the path /zookeeper/data/ containing the id of the server (refer to the Blueprint for information on the zookeeper id). The id must be unique in the cluster.
Any easy way to generate an id is to use the suffix of the server hostname.
export VMSuffix=$(hostname -s | rev | cut -d"-" -f1 | rev)
cat > /zookeeper/data/myid <<EOF
$VMSuffix
EOF
Create a file named zoo.cfg under /usr/local/zookeeper/conf with the following content
touch /usr/local/zookeeper/conf/zoo.cfg
cat > /usr/local/zookeeper/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/zookeeper/data
dataLogDir=/zookeeper/logs
server.X=<replace with fqdn of node 1>:2888:3888
server.X=<replace with fqdn of node 2>:2888:3888
server.X=<replace with fqdn of node 3>:2888:3888
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true
EOF
This file contains the ZooKeeper server configuration information. The above configuration assumes a 3 node ZooKeeper ensemble. Note the “x” in server.x will need to be replaced with the id of the server.
Create chroot path for Kafka
mkdir -p /zookeeper/<country code>clst<nn>kakfa
You should use a naming scheme that makes sense in your environment, the one here assumes:
Country code e.g. UK, US etc.
nn is a numeric cluster identifier, e.g. 01
Create a file named java.env under the path /usr/local/zookeeper/conf with the following content.
This file is used to pass in extra flags when starting ZooKeeper via environment variables. In this case we specify a Java Authentication and Authorization Service (JAAS) configuration file that tells ZooKeeper how to authenticate. The second parameter enables debug mode for Kerberos, which cause ZooKeeper to include information on Kerberos related errors (e.g. authentication failed etc.).
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/zookeeper/conf/zookeeper_jaas.conf -Dsun.security.krb5.debug=true"
Modify the log4.properties file under the path /usr/local/zookeeper/conf, to replace the first 9 lines which are:
# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log
With
# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE, ROLLINGFILE
zookeeper.console.threshold=INFO
zookeeper.log.dir=/var/log/zookeeper
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=INFO
zookeeper.tracelog.dir=/var/log/zookeeper
zookeeper.tracelog.file=zookeeper_trace.log
Once ZooKeeper is built and tested the logging can be turned down e.g. replace the INFO (in red above) with WARN.
Create a systemd unit file for ZooKeeper /etc/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper distributed coordination server
Documentation=http://zookeeper.apache.org
After=network.target
[Service]
Type=forking
User=zookeeper
Group=zookeeper
SyslogIdentifier=zookeeper
WorkingDirectory=/zookeeper/data
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start /usr/local/zookeeper/conf/zoo.cfg
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
TimeoutSec=30
Restart=always
[Install]
WantedBy=multi-user.target
The command below enables the ZooKeeper service
systemctl enable zookeeper
Do not start the service until you have created the AD user and Kerberos keytabs, otherwise the service will fail to start (the process to do this is described in the next section).
Create AD User and Kerberos Keytab for ZooKeeper Server
Each ZooKeeper server requires an account in AD to be created and a keytab created on the server itself.
Create accounts in Active Directory for ZooKeeper using the following format that takes elements from the server naming convention.
- SamAccountName: zkCCDDDSSSNNNN, where
- CC is the country code e.g. GB, US
- DDD is the data centre identifier
- SSS is the server type e.g. SVV, SVB, SVR
- NNNN is the numeric suffix that
- UserPrincipalName: zookeeper/<server fqdn>@<REALM> , where
- Server FQDN is the fully qualified domain name of the server
- REALM is the AD domain, in this example we use MYDOMAIN
The naming scheme should be modified to suit your needs.
$SecPaswd= ConvertTo-SecureString -String 'AComplex20CharPasswordWithNumLettersAndPunctuation' -AsPlainText -Force
New-ADUser -Name “ZooKeeper Server GB DC1” -SamAccountName zkGBDCSVV0001 -UserPrincipalName zookeeper/[email protected] -Path “OU=kafka,DC=MYDOMAIN,DC=co,DC=uk” -Enabled $true -AccountPassword $SecPaswd
The password should be set to never expire:
Set-ADUser -Identity zkGBWATSVV0001 -PasswordNeverExpires $true
Then create a SPN, this step is important, if it is skipped it will lead to Kerberos errors stating that the server does not exist in the database.
get-aduser zkGBDC1SVV0001 | set-aduser -ServicePrincipalNames @{Add=zookeeper/ gb-dc1-svv-0001.mydomain.co.uk} -Server GB-DC1-SVV-1101.mydomain.co.uk
Ensure that there are forward (e.g. hostname to IP) and reverse (IP to hostname) DNS records in AD DNS. This is another common cause of Kerberos errors.
SSH to the ZooKeeper server
Create the directory keytabs under /etc/security
Note, since this is a development cluster where Kafka and ZooKeeper are installed on the same server, we create subdirectories under /etc/security/keytabs for Kafka and ZooKeeper; and modify the permissions accordingly. E.g. the /etc/security/keytabs directory will need to be owned by the user kafka and the group zookeeper.
mkdir -p /etc/security/keytabs/
cd /etc/security/keytabs
chown -R kafka:kafka /etc/security/keytabs
chmod 750 /etc/security/keytabs
Create the keytab for the AD principal that you created in step 1, entering the password when prompted
ktutil
addent -password -p zookeeper/[email protected] -k 1 -e rc4-hmac
wkt zookeeper_server.keytab
quit
Modify the permissions of the keytab
chown zookeeper:zookeeper zookeeper_server.keytab
chmod 400 zookeeper_server.keytab
Create JAAS Configuration File for ZooKeeper Server
The Java Authentication and Authorization Service (JAAS) was introduced as an optional package (extension) to the Java 2 SDK. JAAS implements a Java version of the standard Pluggable Authentication Module (PAM) framework. Kafka and ZooKeeper use JAAS for authentication to authenticate against Kerberos.
After having created the user accounts in AD and a keytab file, we must now create a JAAS configuration file and configure ZooKeeper to use it.
In the directory /usr/local/zookeeper/conf, create a file named zookeeper_jaas.conf, ensure the file is owned by the user and group zookeeper
Enter the following in the file, changing the part in read (the hostname and realm) accordingly:
Server {
com.sun.security.auth.module.Krb5LoginModule required debug=true
useKeyTab=true
storeKey=true
doNotPrompt=true
useTicketCache=false
keyTab="/etc/security/keytabs/zookeeper/zookeeper_server.keytab"
principal="zookeeper/[email protected]";
};
Note the semicolon should only appear on the last line within the curly braces and after the curly braces. Here is a brief explanation of the options, although most of these should be self-explanatory:
- The first line specifies Kerberos is used for authentication and “debug=true” enables debug mode for Kerberos which is useful for troubleshooting authentication issues.
- The line “useKeyTab=true” specifies that a keytab should be used to authenticate (as opposed to prompting for a username and password).
- The next line specifies that the principal’s key will be stored in the subject’s private credentials.
- The next line prevents prompting for credentials in the event the keytab does not work or the
- credentials cannot be obtained from the Kerberos cache.
- The next line specifies that the TGT should not be fetched from cache
- The next line specifies the path to the keytab file.
- The last line specifies the name of the principal that should be used for authentication
Create AD User and Kerberos Keytab for ZooKeeper Client
This user account’s keytab will be used when using the command line tools to connect to ZooKeeper for administration and troubleshooting purposes – this is not for application use. Create one client account per ZooKeeper server.
Create accounts in Active Directory for ZooKeeper using the following format that takes elements from the server naming convention.
- SamAccountName: ZkClientCCDDDNNNN, where
- CC is the country code e.g. GB, US
- DDD is the data centre identifier e.g. DC1, DC2, etc
- NNNN is the numeric suffix that
- UserPrincipalName: ZkClient/<server fqdn>@<REALM> , where
- Server FQDN is the fully qualified domain name of the server
- REALM is MYDOMAIN.CO.UK
The password should be set to never expire.
Then create a SPN, this step is important, if it is skipped it will lead to Kerberos errors stating that the server does not exist in the database.
$SecPaswd= ConvertTo-SecureString -String 'AComplex20CharPasswordWithNumLettersAndPunctuation' -AsPlainText -Force
New-ADUser -Name “ZooKeeper Client GB DC1” -SamAccountName ZkClientGBDC10001 -UserPrincipalName ZkClient/[email protected] -Path “OU=kafka,DC=mydomain,DC=co,DC=uk” -Enabled $true -AccountPassword $SecPaswd
Set-ADUser -Identity ZkClientGBDC10001 -PasswordNeverExpires $true
get-aduser ZkClientGBDC10001 | set-aduser -ServicePrincipalNames @{Add=ZkClient/ gb-dc1-svv-0001.mydomain.co.uk} -Server GB-DC1-SVV-1101.mydomain.co.uk
Ensure that there are forward (e.g. hostname to IP) and reverse (IP to hostname) DNS records in AD DNS. This is another common cause of Kerberos errors.
SSH to the ZooKeeper server
Create the keytab for the AD principal that you created in step 1, entering the password when prompted
Ktutil
addent -password -p ZkClient/[email protected] -k 1 -e rc4-hmac
wkt zookeeper_client.keytab
quit
Modify the permissions of the keytab
chown zookeeper:zookeeper zookeeper_client.keytab
chmod 400 zookeeper_client.keytab
Modify the JAAS Configuration File for ZooKeeper Server
Having created the ZooKeeper client credentials we must modify the JAAS configuration file on the ZooKeeper server.
SSH to the ZooKeeper server
Edit the file zookeeper_jaas.conf
Add a section for the client configuration, an example is given below (don’t delete the existing content), changing the part in read (the hostname and realm) accordingly:
Client {
com.sun.security.auth.module.Krb5LoginModule required debug=true
useKeyTab=true
storeKey=true
doNotPrompt=true
useTicketCache=false
keyTab="/etc/security/keytabs/zookeeper_client.keytab"
principal=" ZkClient/[email protected]";
};
Testing ZooKeeper after Installation and Enabling Kerberos Authentication
ZooKeeper is shipped with command line tool named zkCli.sh, this can be used to test connectivity to your Kerberised ZooKeeper ensemble.
Run the tool with the -server parameter and a comma separated list of your ZooKeeper and port.
/usr/local/zookeeper/bin/zkCli.sh -server ZooKeeperServer1:2181, ZooKeeperServer2:2181, ZooKeeperServer3:2181
Example output is shown with comments inserted.
[root@DSO-DHI-ZK-0001 config]# /usr/local/zookeeper/bin/zkCli.sh -server DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181
Connecting to DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181
2016-07-02 14:32:08,678 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2016-07-02 14:32:08,688 [myid:] - INFO [main:Environment@100] - Client environment:host.name=dso-dhi-zk-0001.dh.local
2016-07-02 14:32:08,688 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_51
2016-07-02 14:32:08,697 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2016-07-02 14:32:08,697 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/java/jdk1.8.0_51/jre
2016-07-02 14:32:08,697 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.6.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA>
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-327.18.2.el7.x86_64
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root
2016-07-02 14:32:08,698 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/kafka_2.11-0.10.0.0/config
2016-07-02 14:32:08,706 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@3eb07fd3
Welcome to ZooKeeper!
Debug is true storeKey true useTicketCache true useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /etc/security/keytabs/zookeeper_client.keytab refreshKrb5Config is false principal is ZkClient/dso-dhi-zk-0001.dh.local tryFirstPass is false useFirstPass is false storePass is false clearPass is false
JLine support is enabled
Acquire TGT from Cache
Credentials are no longer valid
Principal is ZkClient/[email protected]
null credentials from Ticket Cache
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTING) 0] principal is ZkClient/[email protected]
Will use keytab
Commit Succeeded
2016-07-02 14:32:09,351 [myid:] – INFO [main-SendThread(dso-dhi-zk-0001.dh.local:2181):Login@293] – successfully logged in.
2016-07-02 14:32:09,353 [myid:] – INFO [Thread-1:Login$1@127] – TGT refresh thread started.
2016-07-02 14:32:09,363 [myid:] – INFO [main-SendThread(dso-dhi-zk-0001.dh.local:2181):ZooKeeperSaslClient$1@285] – Client will use GSSAPI as SASL mechanism.
2016-07-02 14:32:09,388 [myid:] – INFO [Thread-1:Login@301] – TGT valid starting at: Sat Jul 02 14:32:09 UTC 2016
2016-07-02 14:32:09,388 [myid:] – INFO [Thread-1:Login@302] – TGT expires: Sun Jul 03 00:32:09 UTC 2016
2016-07-02 14:32:09,388 [myid:] – INFO [Thread-1:Login$1@181] – TGT refresh sleeping until: Sat Jul 02 22:53:43 UTC 2016
2016-07-02 14:32:09,392 [myid:] – INFO [main-SendThread(dso-dhi-zk-0001.dh.local:2181):ClientCnxn$SendThread@975] – Opening socket connection to server dso-dhi-zk-0001.dh.local/10.1.3.4:2181. Will attempt to SASL-authenticate using Login Context section ‘Client’
2016-07-02 14:32:09,403 [myid:] – INFO [main-SendThread(dso-dhi-zk-0001.dh.local:2181):ClientCnxn$SendThread@852] – Socket connection established to dso-dhi-zk-0001.dh.local/10.1.3.4:2181, initiating session
2016-07-02 14:32:09,426 [myid:] – INFO [main-SendThread(dso-dhi-zk-0001.dh.local:2181):ClientCnxn$SendThread@1235] – Session establishment complete on server dso-dhi-zk-0001.dh.local/10.1.3.4:2181, sessionid = 0x15597ee807f02fd, negotiated timeout = 30000
[COMMENT: At this point we have a SASL/Kerberos authenticated session.]
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
WATCHER::
WatchedEvent state:SaslAuthenticated type:None path:null
[ COMMENT: You may have to press enter at this point in order to see the line below]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTED) 0]
[ COMMENT: List znodes]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTED) 1] ls /
[zookeeper, gbclst01kakfa;DSO-DHI-ZK-0002:2181, gbclst01kakfa]
[controller_epoch, controller, brokers, admin, isr_change_notification, consumers, config]
[ COMMENT: List gbclst01kakfa znode]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTED) 3] ls /gbclst01kakfa/brokers
[ids, topics, seqid]
[ COMMENT: List the broker ids – we have three Kafka brokers connected to this ZooKeeper ensemble]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTED) 4] ls /gbclst01kakfa/brokers/ids
[1, 2, 3]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTED) 5]
[COMMENT: type “help” to see a list of commands.]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTED) 12] help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
[COMMENT: type “close” to close your session.]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CONNECTED) 13] close
2016-07-02 14:42:53,105 [myid:] – INFO [main:ZooKeeper@684] – Session: 0x15597ee807f02fd closed
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CLOSED) 14] 2016-07-02 14:42:53,105 [myid:] – INFO [main-EventThread:ClientCnxn$EventThread@512] – EventThread shut down
[COMMENT: type “quit” to exit the zkCli shell.]
[zk: DSO-DHI-ZK-0001:2181,DSO-DHI-ZK-0002:2181,DSO-DHI-ZK-0003:2181(CLOSED) 14] quit
Quitting…
[root@DSO-DHI-ZK-0001 config]#
Kafka Brokers
Once we have ZooKeeper configured we can install and configure Kafka brokers
Kafka software dependencies:
- Java Development Kit
- Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files
Installation
Ensure the server has been domain joined and the krb5-workstation package is installed
Install JDK 1.8 u51
export JDK_VERSION="jdk-8u51-linux-x64"
export JDK_RPM="$JDK_VERSION.rpm"
wget -O /tmp/$JDK_RPM --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u51-b16/$JDK_RPM
rpm -ivh /tmp/$JDK_RPM
Install the Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip"
unzip jce_policy-8.zip
mv /usr/java/default/jre/lib/security/local_policy.jar /usr/java/default/jre/lib/security/local_policy.jar.backup
mv /usr/java/default/jre/lib/security/US_export_policy.jar /usr/java/default/jre/lib/security/US_export_policy.jar.backup
mv UnlimitedJCEPolicyJDK8/*.jar /usr/java/default/jre/lib/security/
rm -f jce_policy-8.zip
Download Kafka 0.10.0.0 from http://kafka.apache.org/downloads.html
wget http://www-eu.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
wget https://dist.apache.org/repos/dist/release/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz.md5
cat kafka_2.11-0.10.0.0.tgz.md5 | cut -d”:” -f2 | tr -d ‘ ‘ | awk ‘{print tolower($0)}’
md5sum kafka_2.11-0.10.0.0.tgz
Extract the gzip file
tar -zxf kafka_2.11-0.10.0.0.tgz
Move the Kafka directory to /usr/local and create a symlink
mv kafka_2.11-0.10.0.0 /usr/local/
ln -s /usr/local/kafka_2.11-0.10.0.0 /usr/local/kafka
Creating a symlink makes it easier to switch between different Kafka versions.
Create the Kafka data directories as per the blueprint, in this example we are create 3 data directories. These should be mount points for separate disks to improve performance.
export KAFKA_DIR='/kafka'
mkdir -p $KAFKA_DIR/data{1,2,3}
mkdir -p $KAFKA_DIR/logs
Create a user account under which Kafka will run
export KAFKA_USER='kafka'
export KAFKA_GROUP='kafka'
export KAFKA_HOME='/usr/local/kafka'
/usr/sbin/groupadd -r $KAFKA_GROUP &>/dev/null
id $KAFKA_USER &>/dev/null || useradd -s /sbin/nologin -r -M $KAFKA_USER -g $KAFKA_GROUP -c “Kafka Broker”
usermod $KAFKA_USER -d “$KAFKA_DIR”
Ensure the OS tuning guidelines are applied as per the blueprint, an example is provided below for how to apply these settings.
touch /etc/sysctl.d/kafka.conf
echo 0 > /proc/sys/vm/swappiness
sysctl -w vm.swappiness=0
echo vm.swappiness=0 >> /etc/sysctl.d/kafka.conf
echo “vm.dirty_ratio=5” >> /etc/sysctl.d/kafka.conf
sysctl -w vm.dirty_ratio=5
echo “vm.dirty_background_bytes=60” >> /etc/sysctl.d/kafka.conf
sysctl -w vm.dirty_background_bytes=60
echo “Setting Auto-tuning read/write TCP buffer limits”
sysctl -w net.ipv4.tcp_rmem=’4096 65536 2048000′
echo ‘net.ipv4.tcp_rmem = 4096 65536 2048000′ >> /etc/sysctl.d/kafka.conf
sysctl -w net.ipv4.tcp_wmem=’4096 65536 2048000’
echo ‘net.ipv4.tcp_rmem = 4096 65536 2048000’ >> /etc/sysctl.d/kafka.conf
echo “Setting Socket buffer defaults and limits”
sysctl -w net.core.wmem_default=131072
echo ‘net.core.wmem_default=131072’ >> /etc/sysctl.d/kafka.conf
sysctl -w net.core.rmem_default=131072
echo ‘net.core.rmem_default=131072’ >> /etc/sysctl.d/kafka.conf
sysctl -w net.core.wmem_max=2097152
echo ‘net.core.wmem_max=2097152’ >> /etc/sysctl.d/kafka.conf
sysctl -w net.core.rmem_max=2097152
echo ‘net.core.rmem_max=2097152’ >> /etc/sysctl.d/kafka.conf
echo “Setting net.ipv4.tcp_window_scaling to 1 (enabling)”
sysctl -w net.ipv4.tcp_window_scaling=1
echo ‘net.ipv4.tcp_window_scaling = 1′ >> /etc/sysctl.d/kafka.conf
sysctl -w net.ipv4.tcp_wmem=’4096 65536 2048000’
echo ‘net.ipv4.tcp_rmem = 4096 65536 2048000’ >> /etc/sysctl.d/kafka.conf
echo “Setting the TCP SYN (half-open) connection backlog to 4096”
sysctl -w net.ipv4.tcp_max_syn_backlog=4096
echo ‘net.ipv4.tcp_max_syn_backlog = 4096’ >> /etc/sysctl.d/kafka.conf
sysctl -w net.core.netdev_max_backlog=30000
echo ‘net.core.netdev_max_backlog = 30000’ >> /etc/sysctl.d/kafka.conf
In the directory /usr/local/kafka/config/, create a file named server.properties, this file will contain the Kafka configuration. Refer to the blueprint for the contents of the configuration file and the properties to be set, an example is provided below.
This example uses the numeric suffix of the VM for the broker id. Note that fully qualified server names are used at all times. In addition, we specify SASL_PLAINTEXT, which signifies an authenticated but unencrypted connection. Later in this document we will outline how to enable SSL.
export VMFQDN=$(hostname -f)
export VMSuffix=$(hostname -s | rev | cut -d"-" -f1 | rev)
cat > /usr/local/kafka/config/server.properties <<EOF
broker.id=$VMSuffix
listeners = SASL_PLAINTEXT://$VMFQDN:9092
advertised.listeners=SASL_PLAINTEXT://$VMFQDN:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
zookeeper.connect=<zookeeperServerFQDN1>:2181, <zookeeperServerFQDN2>:2181, <zookeeperServerFQDN3>:2181/gbclst01kakfa
log.dirs=/kafka/data1,/kafka/data2,/kafka/data3
num.reocvery.threads.per.data.dir=4
auto.create.topics=false
num.partitions=1
log.retention.ms=86400000
log.segment.bytes=1073741824
max.message.bytes=1000000
message.max.bytes=1000000
replica.fetch.max.bytes=1000000
delete.topic.enable=true
controlled.shutdown.enable=true
secure=false
reserved.broker.max.id=3000
EOF
Create a krb5.conf file for your domain and place it under /usr/local/kafka/config. Ensure the file contains a section named “[domain_realm]”
[domain_realm]
mydomain.co.uk = MYDOMAIN.CO.UK
Where the AD domain on the left must be in lowercase and the realm in upper case on the right.
Modify the log4.properties file under the path /usr/local/kafka/config, and after the comments add the line in red below to modify the location of Kafka log files.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
kafka.logs.dir=/var/log/kafka
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
Once Kafka is built and tested the logging can be turned down e.g. replace the INFO with WARN in the file above.
Create a systemd unit file for Kafka etc/systemd/system/kafka.service
[Unit]
Description=Kafka publish-subscribe messaging system
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=network.target zookeeper.service
[Service]
User=kafka
Group=kafka
SyslogIdentifier=kafka
WorkingDirectory=/kafka
ExecStart=/usr/bin/java \
-Xmx1G -Xms1G -server \
-XX:+UseCompressedOops \
-XX:+UseParNewGC \
-XX:+UseConcMarkSweepGC \
-XX:+CMSClassUnloadingEnabled \
-XX:+CMSScavengeBeforeRemark \
-XX:+DisableExplicitGC \
-Djava.awt.headless=true \
-Xloggc:/var/log/kafka/kafkaServer-gc.log \
-verbose:gc \
-XX:+PrintGCDetails \
-XX:+PrintGCDateStamps \
-XX:+PrintGCTimeStamps \
-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_jaas.conf \
-Dkafka.logs.dir=/var/log/kafka \
-Dlog4j.configuration=file:/usr/local/kafka/config/log4j.properties \
-Djava.security.krb5.conf=/usr/local/kafka/config/krb5.conf \
-cp /usr/local/kafka/libs/* \
kafka.Kafka \
/usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
TimeoutSec=15
Restart=on-failure
[Install]
WantedBy=multi-user.target
NOTE: The dependency on ZooKeeper in the systemd unit file is only required on servers where you host Kafka and ZooKeeper on the same servers.
The command below enables the Kafka service
Do not start the service until you have created the AD user and Kerberos keytabs, otherwise the service will fail to start (the process of this is described in the next section).
systemctl enable kafka
Create AD User and Kerberos Keytab for Kafka Broker
Each Kafka server requires an account in AD to be created and a keytab created on the server itself.
Create accounts in Active Directory for Kafka using a naming scheme that is appropriate for your environment.
- SamAccountName: kfCCDDDSSSNNNN, where
- CC is the country code e.g. GB, US
- DDD is the data centre identifier e.g. DC1, DC2, etc
- SSS is the server type e.g. SVV, SVB, SVR
- NNNN is the numeric suffix that
- UserPrincipalName: kafka/<server fqdn>@<REALM> , where
- Server FQDN is the fully qualified domain name of the server
- REALM is MYDOMAIN.CO.UK
The password should be set to never expire:
$SecPaswd= ConvertTo-SecureString -String 'AComplex20CharPasswordWithNumLettersAndPunctuation' -AsPlainText -Force
New-ADUser -Name “Kafka Server GB DC1” -SamAccountName kfGBDC1SVV0001 -UserPrincipalName kafka/[email protected] -Path “OU=kafka,DC=mydomain,DC=co,DC=uk” -Enabled $true -AccountPassword $SecPaswd
Set-ADUser -Identity kfGBDC1SVV0001 -PasswordNeverExpires $true
Then create a SPN, this step is important, if it is skipped it will lead to Kerberos errors stating that the server does not exist in the database.
get-aduser kfGBDC1SVV0001 | set-aduser -ServicePrincipalNames @{Add=kafka/ gb-dc1-svv-0001.mydomain.co.uk} -Server GB-DC1-SVV-1101.mydomain.co.uk
Ensure that there are forward (e.g. hostname to IP) and reverse (IP to hostname) DNS records in AD DNS. This is another common cause of Kerberos errors.
SSH to the Kafka server
Create the directory keytabs under /etc/security
mkdir -p /etc/security/keytabs/
cd /etc/security/keytabs
chown -R kafka:kafka /etc/security/keytabs
chmod 750 /etc/security/keytabs
Note, since this is a development cluster where Kafka and ZooKeeper are installed on the same server, we create subdirectories under /etc/security/keytabs for Kafka and ZooKeeper; and modify the permissions accordingly.
Create the keytab for the AD principal that you createad in step 1, entering the password when prompted
ktutil
addent -password -p kafka/[email protected] -k 1 -e rc4-hmac
wkt kafka_server.keytab
quit
Modify the permissions of the keytab
chown kafka:kafka kafka_server.keytab
chmod 400 kafka_server.keytab
Create JAAS Configuration File for Kafka
After having created the user accounts in AD and a keytab file, we must now create a JAAS configuration file and configure Kafka to use it.
In the directory /usr/local/kafka/config, create a file named kafka_jaas.conf, ensure the file is owned by the user and group kafka.
Enter the following in the file, changing the part in red (the hostname and realm) accordingly:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required debug=true
useKeyTab=true
storeKey=true
doNotPrompt=true
useTicketCache=false
serviceName=kafka
keyTab="/etc/security/keytabs/kafka/kafka_server.keytab"
principal="kafka/<KafkaServerFQDN>@MYDOMAIN.CO.UK ";
};
// ZooKeeper client authentication
Client {
com.sun.security.auth.module.Krb5LoginModule required debug=true
useKeyTab=true
storeKey=true
doNotPrompt=true
useTicketCache=false
serviceName=kafka
keyTab="/etc/security/keytabs/kafka/kafka_server.keytab"
principal="kafka/<KafkaServerFQDN>@MYDOMAIN.CO.UK";
};
Testing Kafka after Installation and Enabling Kerberos Authentication
Kafka is shipped with command line tool named kafka-topics.sh, this can be used to test Kafka (and therefore Kafka – ZooKeeper communications).
Run the tool with the -zookeeper parameter and specify one of the zookeeper servers and port, the name after the slash is the chroot path; and lastly add the -list parameter, to list all topics. In a new cluster there will be no topics but this will still test that Kafka and ZooKeeper are functioning.
You can specify a comma separated list of zookeeper servers, if you do the script will try to connect to each zookeeper in turn if it cannot connect to the first.
/usr/local/kafka/bin/kafka-topics.sh –zookeeper <ZooKeeperServer>:2181/<chroot path> –list
In the next example we use the same tool to create topic named test, use the describe parameter to view its properties and then delete it.
[root@DSO-DHI-ZK-0001 config]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper DSO-DHI-ZK-0001:2181/gbclst01kakfa --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[root@DSO-DHI-ZK-0001 config]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper DSO-DHI-ZK-0001:2181/gbclst01kakfa --describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
[root@DSO-DHI-ZK-0001 config]#
[root@DSO-DHI-ZK-0001 config]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper DSO-DHI-ZK-0001:2181/gbclst01kakfa --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Enabling SSL for Kafka
SSL certificates will need to be created for each Kafka server.
SSH to the Kafka server
Create a directory to store the SSL certificates and change to this directory
mkdir -p /kafka/ssl/certs/
cd /kafka/ssl/certs/
Create a keystore using the keytool command. Choose a long and complex password.
export VMFQDN=$(hostname -f)
echo $VMFQDN
# validity is in days, 730 days = 2 years, 1095 days = 3 years
keytool -keystore kafka.server.keystore.jks -alias $VMFQDN -validity 1095 -genkey -dname “CN= ${VMFQDN}, OU=TechDept, O=acme ltd, L=London, ST=London, C=GB”
Generate a Certificate Signing Request (CSR) using the keytool
export VMFQDN=$(hostname -f)
keytool -certreq -keystore kafka.server.keystore.jks -alias $VMFQDN -keyalg rsa -file "${VMFQDN}_kafka_broker.csr" -dname "CN=${VMFQDN}, OU=TechDept, O=acme ltd, L=London, ST=London, C=GB"
The CSR then must be provided to your CA to generate a certificate
Import the certificate into the keystore
keytool -trustcacerts -keystore kafka.server.keystore.jks -alias $VMFQDN -import -file <certificate file>
You will need the Root and Intermediate certificates that are used to generate the certificates for the Kafka broker. These certificates will need to be imported into the truststore.
The command below shows how to export the certificate from the Root CA:
certutil -ca.cert mydomain-ca-cert.crt
The next command shows how to export the certificate from the Intermediate CA:
certutil -ca.cert mydomain-intermediate-ca-cert.crt
The next example shows how to import the Root CA and Intermediate CA into the truststore.
# Import the CA & Intermediate certificate into the truststore
cp mydomain-ca-cert.crt /kafka/ssl/certs/
cp mydomain-intermediate-ca-cert.crt /kafka/ssl/certs/
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file /kafka/ssl/certs/mydomain-ca-cert.crt
keytool -keystore kafka.server.truststore.jks -alias CAItermediate -import -file /kafka/ssl/certs/mydomain-intermediate-ca-cert.crt
You will be prompted to set a password, choose a long and complex password.
Running the commands below should show the CARoot, CAIntermediate and the broker certificate in the truststore and keystore respectively.
keytool -list -keystore kafka.server.truststore.jks
keytool -list -keystore kafka.server.keystore.jks
The Kafka broker configuration must now be updated. First create a backup of the previous configuration file.
export DATETS=$(date +'%F-%R' | tr -d ':')
cp /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties.${DATETS}.backup
Make the changes indicated to configuration file indicated below.
Change
listeners = SASL_PLAINTEXT://$VMFQDN:9092
To
listeners = SASL_SSL://$VMFQDN:9092
Where $VMFQDN is the fully qualified domain name of the server
Change
advertised.listeners=SASL_PLAINTEXT://$VMFQDN:9092
To
advertised.listeners=SASL_SSL://$VMFQDN:9092
Where $VMFQDN is the fully qualified domain name of the server
Change
security.inter.broker.protocol=SASL_PLAINTEXT<c/ode>
To
security.inter.broker.protocol=SASL_SSL
Add the following properties
ssl.keystore.location=/kafka/ssl/certs/kafka.server.keystore.jks
ssl.keystore.password=<password>
ssl.key.password==<password>
ssl.truststore.location=/kafka/ssl/certs/kafka.server.truststore.jks
ssl.truststore.password=<password>
Replace <password> with the actual keystore, truststore or SSL cert password as appropriate.
Once ALL Kafka brokers been configured restart them using the systemctl command.
systemctl stop kafka
systemctl status kafka -l
systemctl start kafka
systemctl status kafka -l
To check quickly if the server keystore and truststore are setup properly you can run the following command on each Kafka broker
openssl s_client -debug -connect "$(hostname -I)":9092 -tls1
There may be warnings or errors in the openssl command output about the use of self-signed certificates, this is because openssl does not use the Java truststore we created for verifying certificates. However, you can solve this on CentOS 7.x by placing the certificates to be trusted (in PEM format) in /etc/pki/ca-trust/source/anchors/ and run sudo update-ca-trust.
cd /kafka/ssl/certs
cp mydomain-ca-cert.crt /etc/pki/ca-trust/source/anchors/
cp mydomain-intermediate-ca-cert.crt /etc/pki/ca-trust/source/anchors/
update-ca-trust
Enabling Kafka ACLs
In order to enable Kafka ACLs we must modify the server.properties file on each Kafka broker.
Backup the configuration file first
export DATETS=$(date +'%F-%R' | tr -d ':')
cp /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties.${DATETS}.backup
To enable ACLs, we need to configure an authorizer. Kafka provides a simple authorizer implementation, and to use it, you can add the following to server.properties:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
zookeeper.set.acl=true
#Change the line
secure=false
#To
secure=true
The line “zookeeper.set.acl=true” ensures that only brokers will be able to modify the corresponding znodes for the Kafka metadata stored in ZooKeeper.
One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma):
super.users=User:Bob;User:Alice
Refer to the documentation on the Kafka website here http://kafka.apache.org/documentation.html#security_authz_cli for more information on how to manage ACLs.