2

When I try to run this .py:

import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
def create_keyspace(session):
 session.execute("""
 CREATE KEYSPACE IF NOT EXISTS spark_streams
 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
 """)
 print("Keyspace created successfully!")
def create_table(session):
 session.execute("""
 CREATE TABLE IF NOT EXISTS spark_streams.created_users (
 id UUID PRIMARY KEY,
 first_name TEXT,
 last_name TEXT,
 gender TEXT,
 address TEXT,
 post_code TEXT,
 email TEXT,
 username TEXT,
 registered_date TEXT,
 phone TEXT,
 picture TEXT);
 """)
 print("Table created successfully!")
def insert_data(session, **kwargs):
 print("inserting data...")
 user_id = kwargs.get('id')
 first_name = kwargs.get('first_name')
 last_name = kwargs.get('last_name')
 gender = kwargs.get('gender')
 address = kwargs.get('address')
 postcode = kwargs.get('post_code')
 email = kwargs.get('email')
 username = kwargs.get('username')
 dob = kwargs.get('dob')
 registered_date = kwargs.get('registered_date')
 phone = kwargs.get('phone')
 picture = kwargs.get('picture')
 try:
 session.execute("""
 INSERT INTO spark_streams.created_users(id, first_name, last_name, gender, address, 
 post_code, email, username, dob, registered_date, phone, picture)
 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
 """, (user_id, first_name, last_name, gender, address,
 postcode, email, username, dob, registered_date, phone, picture))
 logging.info(f"Data inserted for {first_name} {last_name}")
 except Exception as e:
 logging.error(f'could not insert data due to {e}')
def create_spark_connection():
 s_conn = None
 try:
 s_conn = SparkSession.builder \
 .appName('SparkDataStreaming') \
 .config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1,"
 "org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1") \
 .config('spark.cassandra.connection.host', 'localhost') \
 .getOrCreate()
 s_conn.sparkContext.setLogLevel("ERROR")
 logging.info("Spark connection created successfully!")
 except Exception as e:
 logging.error(f"Couldn't create the spark session due to exception {e}")
 return s_conn
def connect_to_kafka(spark_conn):
 spark_df = None
 try:
 spark_df = spark_conn.readStream \
 .format('kafka') \
 .option('kafka.bootstrap.servers', 'localhost:9092') \
 .option('subscribe', 'users_created') \
 .option('startingOffsets', 'earliest') \
 .load()
 logging.info("kafka dataframe created successfully")
 except Exception as e:
 logging.warning(f"kafka dataframe could not be created because: {e}")
 return spark_df
def create_cassandra_connection():
 try:
 # connecting to the cassandra cluster
 cluster = Cluster(['localhost'])
 cas_session = cluster.connect()
 return cas_session
 except Exception as e:
 logging.error(f"Could not create cassandra connection due to {e}")
 return None
def create_selection_df_from_kafka(spark_df):
 schema = StructType([
 StructField("id", StringType(), False),
 StructField("first_name", StringType(), False),
 StructField("last_name", StringType(), False),
 StructField("gender", StringType(), False),
 StructField("address", StringType(), False),
 StructField("post_code", StringType(), False),
 StructField("email", StringType(), False),
 StructField("username", StringType(), False),
 StructField("registered_date", StringType(), False),
 StructField("phone", StringType(), False),
 StructField("picture", StringType(), False)
 ])
 sel = spark_df.selectExpr("CAST(value AS STRING)") \
 .select(from_json(col('value'), schema).alias('data')).select("data.*")
 print(sel)
 return sel
if __name__ == "__main__":
 # create spark connection
 spark_conn = create_spark_connection()
 if spark_conn is not None:
 # connect to kafka with spark connection
 spark_df = connect_to_kafka(spark_conn)
 selection_df = create_selection_df_from_kafka(spark_df)
 session = create_cassandra_connection()
 if session is not None:
 create_keyspace(session)
 create_table(session)
 logging.info("Streaming is being started...")
 streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
 .option('checkpointLocation', '/tmp/checkpoint')
 .option('keyspace', 'spark_streams')
 .option('table', 'created_users')
 .start())
 streaming_query.awaitTermination()

I get this issue:

(.venv) fran@fran-VirtualBox:~/PycharmProjects/data-engineering$ python spark_stream.py 
24/05/12 18:54:01 WARN Utils: Your hostname, fran-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/05/12 18:54:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/fran/PycharmProjects/data-engineering/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/fran/.ivy2/cache
The jars for the packages stored in: /home/fran/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.13 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8a294b11-ad47-470c-8f84-295222264cee;1.0
 confs: [default]
 found com.datastax.spark#spark-cassandra-connector_2.13;3.4.1 in central
 found com.datastax.spark#spark-cassandra-connector-driver_2.13;3.4.1 in central
 found org.scala-lang.modules#scala-collection-compat_2.13;2.11.0 in central
 found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central
 found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
 found com.datastax.oss#native-protocol;1.5.0 in central
 found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
 found com.typesafe#config;1.4.1 in central
 found org.slf4j#slf4j-api;1.7.26 in central
 found io.dropwizard.metrics#metrics-core;4.1.18 in central
 found org.hdrhistogram#HdrHistogram;2.1.12 in central
 found org.reactivestreams#reactive-streams;1.0.3 in central
 found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
 found com.github.spotbugs#spotbugs-annotations;3.1.12 in central
 found com.google.code.findbugs#jsr305;3.0.2 in central
 found com.datastax.oss#java-driver-mapper-runtime;4.13.0 in central
 found com.datastax.oss#java-driver-query-builder;4.13.0 in central
 found org.apache.commons#commons-lang3;3.10 in central
 found com.thoughtworks.paranamer#paranamer;2.8 in central
 found org.scala-lang#scala-reflect;2.13.11 in central
 found org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.1 in central
 found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.1 in central
 found org.apache.kafka#kafka-clients;3.3.2 in central
 found org.lz4#lz4-java;1.8.0 in central
 found org.xerial.snappy#snappy-java;1.1.10.1 in central
 found org.slf4j#slf4j-api;2.0.6 in central
 found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
 found org.apache.hadoop#hadoop-client-api;3.3.4 in central
 found commons-logging#commons-logging;1.1.3 in central
 found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 2530ms :: artifacts dl 40ms
 :: modules in use:
 com.datastax.oss#java-driver-core-shaded;4.13.0 from central in [default]
 com.datastax.oss#java-driver-mapper-runtime;4.13.0 from central in [default]
 com.datastax.oss#java-driver-query-builder;4.13.0 from central in [default]
 com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 from central in [default]
 com.datastax.oss#native-protocol;1.5.0 from central in [default]
 com.datastax.spark#spark-cassandra-connector-driver_2.13;3.4.1 from central in [default]
 com.datastax.spark#spark-cassandra-connector_2.13;3.4.1 from central in [default]
 com.github.spotbugs#spotbugs-annotations;3.1.12 from central in [default]
 com.github.stephenc.jcip#jcip-annotations;1.0-1 from central in [default]
 com.google.code.findbugs#jsr305;3.0.2 from central in [default]
 com.thoughtworks.paranamer#paranamer;2.8 from central in [default]
 com.typesafe#config;1.4.1 from central in [default]
 commons-logging#commons-logging;1.1.3 from central in [default]
 io.dropwizard.metrics#metrics-core;4.1.18 from central in [default]
 org.apache.commons#commons-lang3;3.10 from central in [default]
 org.apache.commons#commons-pool2;2.11.1 from central in [default]
 org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
 org.apache.hadoop#hadoop-client-runtime;3.3.4 from central in [default]
 org.apache.kafka#kafka-clients;3.3.2 from central in [default]
 org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.1 from central in [default]
 org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.1 from central in [default]
 org.hdrhistogram#HdrHistogram;2.1.12 from central in [default]
 org.lz4#lz4-java;1.8.0 from central in [default]
 org.reactivestreams#reactive-streams;1.0.3 from central in [default]
 org.scala-lang#scala-reflect;2.13.11 from central in [default]
 org.scala-lang.modules#scala-collection-compat_2.13;2.11.0 from central in [default]
 org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 from central in [default]
 org.slf4j#slf4j-api;2.0.6 from central in [default]
 org.xerial.snappy#snappy-java;1.1.10.1 from central in [default]
 :: evicted modules:
 org.slf4j#slf4j-api;1.7.26 by [org.slf4j#slf4j-api;2.0.6] in [default]
 com.google.code.findbugs#jsr305;3.0.0 by [com.google.code.findbugs#jsr305;3.0.2] in [default]
 ---------------------------------------------------------------------
 | | modules || artifacts |
 | conf | number| search|dwnlded|evicted|| number|dwnlded|
 ---------------------------------------------------------------------
 | default | 31 | 0 | 0 | 2 || 29 | 0 |
 ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-8a294b11-ad47-470c-8f84-295222264cee
 confs: [default]
 0 artifacts copied, 29 already retrieved (0kB/54ms)
24/05/12 18:54:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
WARNING:root:kafka dataframe could not be created because: An error occurred while calling o36.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
 at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
 at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
 at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
 at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
 at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
 at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
 at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
 at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
 at py4j.Gateway.invoke(Gateway.java:282)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
 at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
 at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
 at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
 at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
 at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
 ... 20 more
Traceback (most recent call last):
 File "/home/fran/PycharmProjects/data-engineering/spark_stream.py", line 143, in <module>
 selection_df = create_selection_df_from_kafka(spark_df)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/home/fran/PycharmProjects/data-engineering/spark_stream.py", line 129, in create_selection_df_from_kafka
 sel = spark_df.selectExpr("CAST(value AS STRING)") \
 ^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'selectExpr'

I think this "**ClassNotFoundException for scala.$less$colon$less"**is a problem of the Scala versión used by Spark:

Spark version 3.4.1
 /_/
 
Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 11.0.22

Which may be different from the Scala version used by Kafka (notice I am using the kafka-python package of python 3.11).

Could this be the problem? How can I align both Scala versions?

Dmytro Mitin
53.1k3 gold badges34 silver badges76 bronze badges
asked May 12, 2024 at 17:00
1
  • 1
    Not sure why but you have a mix of Scala versions in the dependencies. Look at the suffix _2.12 vs. _2.13. Commented May 13, 2024 at 5:23

1 Answer 1

3

Per the output Using Scala version 2.12.17, you'll need to use Spark dependencies with 2.12 rather than 2.13 such as the Cassandra and Kafka connectors

notice I am using the kafka-python package of python 3.11

That's a native Python package, and so doesn't use Scala, and it's generally not needed as Spark uses Java to read and write to Kafka

answered May 12, 2024 at 17:11
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.