Stream a Kafka topic to Hive
Stay organized with collections
Save and categorize content based on your preferences.
Apache Kafka is an open source distributed streaming platform for real-time data pipelines and data integration. It provides an efficient and scalable streaming system for use in a variety of applications, including:
- Real-time analytics
- Stream processing
- Log aggregation
- Distributed messaging
- Event streaming
Objectives
Install Kafka on a Dataproc HA cluster with ZooKeeper (referred to in this tutorial as a "Dataproc Kafka cluster").
Create fictitious customer data, then publish the data to a Kafka topic.
Create Hive parquet and ORC tables in Cloud Storage to receive streamed Kafka topic data.
Submit a PySpark job to subscribe to and stream the Kafka topic into Cloud Storage in parquet and ORC format.
Run a query on the streamed Hive table data to count the streamed Kafka messages.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.
Before you begin
If you haven't already done so, create a Google Cloud project.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get 300ドル in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. - In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
-
In the Get started section, do the following:
- Enter a globally unique name that meets the bucket naming requirements.
- To add a
bucket label,
expand the Labels section ( ),
click add_box
Add label, and specify a
key
and avalue
for your label.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
-
In the Get started section, do the following:
- Click Create.
Tutorial steps
Perform the following steps to create a Dataproc Kafka cluster to read a Kafka topic into Cloud Storage in parquet OR ORC format.
Copy the Kafka installation script to Cloud Storage
The kafka.sh
initialization action
script installs Kafka on a Dataproc cluster.
Browse the code.
#!/bin/bash # Copyright 2015 Google, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud # Dataproc cluster. set-euxopipefail readonlyZOOKEEPER_HOME=/usr/lib/zookeeper readonlyKAFKA_HOME=/usr/lib/kafka readonlyKAFKA_PROP_FILE='/etc/kafka/conf/server.properties' readonlyROLE="$(/usr/share/google/get_metadata_valueattributes/dataproc-role)" readonlyRUN_ON_MASTER="$(/usr/share/google/get_metadata_valueattributes/run-on-master||echofalse)" readonlyKAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_valueattributes/kafka-enable-jmx||echofalse)" readonlyKAFKA_JMX_PORT="$(/usr/share/google/get_metadata_valueattributes/kafka-jmx-port||echo9999)" readonlyINSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_valueattributes/install-kafka-python||echofalse)" # The first ZooKeeper server address, e.g., "cluster1-m-0:2181". ZOOKEEPER_ADDRESS='' # Integer broker ID of this node, e.g., 0 BROKER_ID='' functionretry_apt_command(){ cmd="1ドル" for((i=0;i < 10;i++));do ifeval"$cmd";then return0 fi sleep5 done return1 } functionrecv_keys(){ retry_apt_command"apt-get install -y gnupg2 &&\ apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C" } functionupdate_apt_get(){ retry_apt_command"apt-get update" } functioninstall_apt_get(){ pkgs="$@" retry_apt_command"apt-get install -y $pkgs" } functionerr(){ echo"[$(date+'%Y-%m-%dT%H:%M:%S%z')]: $@">&2 return1 } # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,". functionget_broker_list(){ ${KAFKA_HOME}/bin/zookeeper-shell.sh"${ZOOKEEPER_ADDRESS}"\ <<<"ls /brokers/ids"| grep'\[.*\]'| sed's/\[/ /'| sed's/\]/,/' } # Waits for zookeeper to be up or time out. functionwait_for_zookeeper(){ foriin{1..20};do if"${ZOOKEEPER_HOME}/bin/zkCli.sh"-server"${ZOOKEEPER_ADDRESS}"ls/;then return0 else echo"Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..." sleep5 fi done echo"Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}">&2 exit1 } # Wait until the current broker is registered or time out. functionwait_for_kafka(){ foriin{1..20};do localbroker_list=$(get_broker_list||true) if[["${broker_list}"==*" ${BROKER_ID},"*]];then return0 else echo"Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..." sleep5 fi done echo"Failed to start Kafka broker ${BROKER_ID}.">&2 exit1 } functioninstall_and_configure_kafka_server(){ # Find zookeeper list first, before attempting any installation. localzookeeper_client_port zookeeper_client_port=$(grep'clientPort'/etc/zookeeper/conf/zoo.cfg| tail-n1| cut-d'='-f2) localzookeeper_list zookeeper_list=$(grep'^server\.'/etc/zookeeper/conf/zoo.cfg| cut-d'='-f2| cut-d':'-f1| sort| uniq| sed"s/$/:${zookeeper_client_port}/"| xargsecho| sed"s/ /,/g") if[[-z"${zookeeper_list}"]];then # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't # bother to populate it. Check if YARN HA is configured. zookeeper_list=$(bdconfigget_property_value--configuration_file\ /etc/hadoop/conf/yarn-site.xml\ --nameyarn.resourcemanager.zk-address2>/dev/null) fi # If all attempts failed, error out. if[[-z"${zookeeper_list}"]];then err'Failed to find configured Zookeeper list; try "--num-masters=3" for HA' fi ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}" # Install Kafka from Dataproc distro. install_apt_getkafka-server||dpkg-lkafka-server|| err'Unable to install and find kafka-server.' mkdir-p/var/lib/kafka-logs chownkafka:kafka-R/var/lib/kafka-logs if[["${ROLE}"=="Master"]];then # For master nodes, broker ID starts from 10,000. if[["$(hostname)"==*-m]];then # non-HA BROKER_ID=10000 else # HA BROKER_ID=$((10000+$(hostname|sed's/.*-m-\([0-9]*\)$/1円/g'))) fi else # For worker nodes, broker ID is a random number generated less than 10000. # 10000 is choosen since the max broker ID allowed being set is 10000. BROKER_ID=$((RANDOM%10000)) fi sed-i's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|'\ "${KAFKA_PROP_FILE}" sed-i's|^\(zookeeper\.connect=\).*|1円'${zookeeper_list}'|'\ "${KAFKA_PROP_FILE}" sed-i's,^\(broker\.id=\).*,1円'${BROKER_ID}','\ "${KAFKA_PROP_FILE}" echo-e'\nreserved.broker.max.id=100000'>>"${KAFKA_PROP_FILE}" echo-e'\ndelete.topic.enable=true'>>"${KAFKA_PROP_FILE}" if[["${KAFKA_ENABLE_JMX}"=="true"]];then sed-i'/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"'/usr/lib/kafka/bin/kafka-server-start.sh sed-i"/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}"/usr/lib/kafka/bin/kafka-server-start.sh fi wait_for_zookeeper # Start Kafka. servicekafka-serverrestart wait_for_kafka } functioninstall_kafka_python_package(){ KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2" if[["${INSTALL_KAFKA_PYTHON}"!="true"]];then return fi if[["$(echo"${DATAPROC_IMAGE_VERSION} > 2.0"|bc)"-eq1]];then /opt/conda/default/bin/pipinstall"${KAFKA_PYTHON_PACKAGE}"||{sleep10;/opt/conda/default/bin/pipinstall"${KAFKA_PYTHON_PACKAGE}";} else OS=$(./etc/os-release && echo"${ID}") if[["${OS}"=="rocky"]];then yuminstall-ypython2-pip else apt-getinstall-ypython-pip fi pip2install"${KAFKA_PYTHON_PACKAGE}"||{sleep10;pip2install"${KAFKA_PYTHON_PACKAGE}";}||{sleep10;pipinstall"${KAFKA_PYTHON_PACKAGE}";} fi } functionremove_old_backports{ # This script uses 'apt-get update' and is therefore potentially dependent on # backports repositories which have been archived. In order to mitigate this # problem, we will remove any reference to backports repos older than oldstable # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157 oldstable=$(curl-shttps://deb.debian.org/debian/dists/oldstable/Release|awk'/^Codename/ {print 2ドル}'); stable=$(curl-shttps://deb.debian.org/debian/dists/stable/Release|awk'/^Codename/ {print 2ドル}'); matched_files="$(grep-rsil'\-backports'/etc/apt/sources.list*)" if[[-n"$matched_files"]];then forfilenamein"$matched_files";do grep-e"$oldstable-backports"-e"$stable-backports""$filename"||\ sed-i-e's/^.*-backports.*$//'"$filename" done fi } functionmain(){ OS=$(./etc/os-release && echo"${ID}") if[[${OS}==debian]] && [[$(echo"${DATAPROC_IMAGE_VERSION} <= 2.1"|bc-l)==1]];then remove_old_backports fi recv_keys||err'Unable to receive keys.' update_apt_get||err'Unable to update packages lists.' install_kafka_python_package # Only run the installation on workers; verify zookeeper on master(s). if[["${ROLE}"=='Master']];then servicezookeeper-serverstatus|| err'Required zookeeper-server not running on master!' if[["${RUN_ON_MASTER}"=="true"]];then # Run installation on masters. install_and_configure_kafka_server else # On master nodes, just install kafka command-line tools and libs but not # kafka-server. install_apt_getkafka|| err'Unable to install kafka libraries on master!' fi else # Run installation on workers. install_and_configure_kafka_server fi } main
Copy the
kafka.sh
initialization action script to your Cloud Storage bucket. This script installs Kafka on a Dataproc cluster.Open Cloud Shell, then run the following command:
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
Make the following replacements:
- REGION:
kafka.sh
is stored in public regionally-tagged buckets in Cloud Storage. Specify a geographically close Compute Engine region, (example:us-central1
). - BUCKET_NAME: The name of your Cloud Storage bucket.
- REGION:
Create a Dataproc Kafka cluster
Open Cloud Shell, then run the following
gcloud dataproc clusters create
command to create a Dataproc HA cluster cluster that installs the Kafka and ZooKeeper components:gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
Notes:
- KAFKA_CLUSTER: The cluster name, which must be unique within a project. The name must start with a lowercase letter, and can contain up to 51 lowercase letters, numbers, and hyphens. It cannot end with a hyphen. The name of a deleted cluster can be reused.
- PROJECT_ID: The project to associate with this cluster.
- REGION: The
Compute Engine region
where the cluster will be located, such as
us-central1
.- You can add the optional
--zone=ZONE
flag to specify a zone within the specified region, such asus-central1-a
. If you do not specify a zone, the Dataproc autozone placement feature selects a zone with the specified region.
- You can add the optional
--image-version
: Dataproc image version2.1-debian11
is recommended for this tutorial. Note: Each image version contains a set of pre-installed components, including the Hive component used in this tutorial (see Supported Dataproc image versions).--num-master
:3
master nodes create an HA cluster. The Zookeeper component, which is required by Kafka, is pre-installed on an HA cluster.--enable-component-gateway
: Enables the Dataproc Component Gateway.- BUCKET_NAME: The name of your Cloud Storage bucket
that contains the
/scripts/kafka.sh
initialization script (see Copy the Kafka installation script to Cloud Storage).
Create a Kafka custdata
topic
To create a Kafka topic on the Dataproc Kafka cluster:
Use the SSH utility to open a terminal window on the cluster master VM.
Create a Kafka
custdata
topic./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
Notes:
KAFKA_CLUSTER: Insert the name of your Kafka cluster.
-w-0:9092
signifies the Kafka broker running on port9092
on theworker-0
node.You can run the following commands after creating the
custdata
topic:# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Publish content to the Kafka custdata
topic
The following script uses the kafka-console-producer.sh
Kafka tool to
generate fictitious customer data in CSV format.
Copy, then paste the script in the SSH terminal on the master node of your Kafka cluster. Press <return> to run the script.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
Notes:
- KAFKA_CLUSTER: The name of your Kafka cluster.
Run the following Kafka command to confirm the
custdata
topic contains 10,000 messages./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
Notes:
- KAFKA_CLUSTER: The name of your Kafka cluster.
Expected output:
custdata:0:10000
Create Hive tables in Cloud Storage
Create Hive tables to receive streamed Kafka topic data.
Perform the following steps to create cust_parquet
(parquet) and a
cust_orc
(ORC) Hive tables in your Cloud Storage bucket.
Insert your BUCKET_NAME in the following script, then copy and paste the script into the SSH terminal on your Kafka cluster master node, then press <return> to create a
~/hivetables.hql
(Hive Query Language) script.You will run the
~/hivetables.hql
script in the next step to create parquet and ORC Hive tables in your Cloud Storage bucket.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
In the SSH terminal on the master node of your Kafka cluster, submit the
~/hivetables.hql
Hive job to createcust_parquet
(parquet) and acust_orc
(ORC) Hive tables in your Cloud Storage bucket.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
Notes:
- The Hive component is pre-installed on the Dataproc Kafka cluster. See 2.1.x release versions for a list of the Hive component versions included in recently released 2.1 images.
- KAFKA_CLUSTER: The name of your Kafka cluster.
- REGION: The region where your Kafka cluster is located.
Stream Kafka custdata
to Hive tables
- Run the following command in the in the SSH terminal on the master node of
your Kafka cluster to install the
kafka-python
library. A Kafka client is needed to stream Kafka topic data to Cloud Storage.pip install kafka-python
Insert your BUCKET_NAME, then copy then paste the following PySpark code into the SSH terminal on your Kafka cluster master node, and then press <return> to create a
streamdata.py
file.The script subscribes to the Kafka
custdata
topic, then streams the data to your Hive tables in Cloud Storage. The output format, which can be parquet or ORC, is passed into the script as a parameter.cat > streamdata.py <<EOF #!/bin/python importsys frompyspark.sql.functionsimport * frompyspark.sql.typesimport * frompyspark.sqlimport SparkSession fromkafkaimport KafkaConsumer defgetNameFn (data): return data.split(",")[0] defgetAgeFn (data): return data.split(",")[1] defgetAmtFn (data): return data.split(",")[2] defmain(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
In the SSH terminal on the master node of your Kafka cluster, run
spark-submit
to stream data to your Hive tables in Cloud Storage.Insert the name of your KAFKA_CLUSTER and the output FORMAT, then copy and paste the following code into the SSH terminal on the master node of your Kafka cluster, and then press <return> to run the code and stream the Kafka
custdata
data in parquet format to your Hive tables in Cloud Storage.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
Notes:
- KAFKA_CLUSTER: Insert the name of your Kafka cluster.
- FORMAT: Specify either
parquet
ororc
as the output format. You can run the command successively to stream both formats to the Hive tables: for example, in the first invocation, specifyparquet
to stream the Kafkacustdata
topic to the Hive parquet table; then, in second invocation, specifyorc
format to streamcustdata
to the Hive ORC table.
After standard output halts in the SSH terminal, which signifies that all of the
custdata
has been streamed, press <control-c> in the SSH terminal to stop the process.List the Hive tables in Cloud Storage.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
Notes:
- BUCKET_NAME: Insert the name of the Cloud Storage bucket that contains your Hive tables (see Create Hive tables).
Query streamed data
In the SSH terminal on the master node of your Kafka cluster, run the following
hive
command to count the streamed Kafkacustdata
messages in the Hive tables in Cloud Storage.hive -e "select count(1) from TABLE_NAME"
Notes:
- TABLE_NAME: Specify either
cust_parquet
orcust_orc
as the Hive table name.
Expected output snippet:
- TABLE_NAME: Specify either
...
Status:Running(ExecutingonYARNclusterwithAppidapplication_....)
----------------------------------------------------------------------------------------------
VERTICESMODESTATUSTOTALCOMPLETEDRUNNINGPENDINGFAILEDKILLED
----------------------------------------------------------------------------------------------
Map1..........containerSUCCEEDED110000
Reducer2......containerSUCCEEDED110000
----------------------------------------------------------------------------------------------
VERTICES:02/02[==========================>>]100%ELAPSEDTIME:9.89s
----------------------------------------------------------------------------------------------
OK
10000
Timetaken:21.394seconds,Fetched:1row(s)
Clean up
Delete the project
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Delete resources
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
- Delete your Kafka cluster:
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}