-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add YARN as a resource / job manager #50
Conversation
Uses YARN by default, but can use spark standalone optionally for now. Tested by importing and querying the Fast Genomics Gene table.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #50 +/- ##
==========================================
+ Coverage 48.07% 50.00% +1.92%
==========================================
Files 4 4
Lines 104 118 +14
==========================================
+ Hits 50 59 +9
- Misses 54 59 +5 ☔ View full report in Codecov by Sentry. |
I don't know what to do now.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code changes look fine. I added platform: linux/amd64
for both yarn images. But I cannot get spark session.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24[/07/24](http://localhost:4042/07/24) 23:37:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24[/07/24](http://localhost:4042/07/24) 23:37:50 WARN Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
File "[/opt/bitnami/python/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038](http://localhost:4042/opt/bitnami/python/lib/python3.11/site-packages/py4j/java_gateway.py#line=1037), in send_command
response = connection.send_command(command)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "[/opt/bitnami/python/lib/python3.11/site-packages/py4j/clientserver.py", line 511](http://localhost:4042/opt/bitnami/python/lib/python3.11/site-packages/py4j/clientserver.py#line=510), in send_command
answer = smart_decode(self.stream.readline()[:-1])
^^^^^^^^^^^^^^^^^^^^^^
File "[/opt/bitnami/python/lib/python3.11/socket.py", line 706](http://localhost:4042/opt/bitnami/python/lib/python3.11/socket.py#line=705), in readinto
return self._sock.recv_into(b)
^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
Cell In[2], line 1
----> 1 spark = get_spark_session()
3 # remove_lck_files()
4 spark.sql("show tables in test").show()
File ~src[/spark/utils.py:143](http://localhost:4042/spark/utils.py#line=142), in get_spark_session(app_name, local, yarn, delta_lake, timeout_sec, executor_cores)
141 for key, value in sc.items():
142 spark_conf.set(key, value)
--> 143 spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
144 timeout_sec = os.getenv('SPARK_TIMEOUT_SECONDS', timeout_sec)
145 Timer(int(timeout_sec), _stop_spark_session, [spark]).start()
File ~opt[/bitnami/spark/python/pyspark/sql/session.py:497](http://localhost:4042/bitnami/spark/python/pyspark/sql/session.py#line=496), in SparkSession.Builder.getOrCreate(self)
495 sparkConf.set(key, value)
496 # This SparkContext may be an existing one.
--> 497 sc = SparkContext.getOrCreate(sparkConf)
498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared
499 # by all sessions.
500 session = SparkSession(sc, options=self._options)
File ~opt[/bitnami/spark/python/pyspark/context.py:515](http://localhost:4042/bitnami/spark/python/pyspark/context.py#line=514), in SparkContext.getOrCreate(cls, conf)
513 with SparkContext._lock:
514 if SparkContext._active_spark_context is None:
--> 515 SparkContext(conf=conf or SparkConf())
516 assert SparkContext._active_spark_context is not None
517 return SparkContext._active_spark_context
File ~opt[/bitnami/spark/python/pyspark/context.py:203](http://localhost:4042/bitnami/spark/python/pyspark/context.py#line=202), in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
202 try:
--> 203 self._do_init(
204 master,
205 appName,
206 sparkHome,
207 pyFiles,
208 environment,
209 batchSize,
210 serializer,
211 conf,
212 jsc,
213 profiler_cls,
214 udf_profiler_cls,
215 memory_profiler_cls,
216 )
217 except BaseException:
218 # If an error occurs, clean up in order to allow future SparkContext creation:
219 self.stop()
File ~opt[/bitnami/spark/python/pyspark/context.py:296](http://localhost:4042/bitnami/spark/python/pyspark/context.py#line=295), in SparkContext._do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
293 self.environment["PYTHONHASHSEED"] = os.environ.get("PYTHONHASHSEED", "0")
295 # Create the Java SparkContext through Py4J
--> 296 self._jsc = jsc or self._initialize_context(self._conf._jconf)
297 # Reset the SparkConf to the one actually used by the SparkContext in JVM.
298 self._conf = SparkConf(_jconf=self._jsc.sc().conf())
File ~opt[/bitnami/spark/python/pyspark/context.py:421](http://localhost:4042/bitnami/spark/python/pyspark/context.py#line=420), in SparkContext._initialize_context(self, jconf)
417 """
418 Initialize SparkContext in function to allow subclass specific initialization
419 """
420 assert self._jvm is not None
--> 421 return self._jvm.JavaSparkContext(jconf)
File ~opt[/bitnami/python/lib/python3.11/site-packages/py4j/java_gateway.py:1586](http://localhost:4042/bitnami/python/lib/python3.11/site-packages/py4j/java_gateway.py#line=1585), in JavaClass.__call__(self, *args)
1578 args_command = "".join(
1579 [get_command_part(arg, self._pool) for arg in new_args])
1581 command = proto.CONSTRUCTOR_COMMAND_NAME +\
1582 self._command_header +\
1583 args_command +\
1584 proto.END_COMMAND_PART
-> 1586 answer = self._gateway_client.send_command(command)
1587 return_value = get_return_value(
1588 answer, self._gateway_client, None, self._fqn)
1590 for temp_arg in temp_args:
File ~opt[/bitnami/python/lib/python3.11/site-packages/py4j/java_gateway.py:1038](http://localhost:4042/bitnami/python/lib/python3.11/site-packages/py4j/java_gateway.py#line=1037), in GatewayClient.send_command(self, command, retry, binary)
1036 connection = self._get_connection()
1037 try:
-> 1038 response = connection.send_command(command)
1039 if binary:
1040 return response, self._create_connection_guard(connection)
File ~opt[/bitnami/python/lib/python3.11/site-packages/py4j/clientserver.py:511](http://localhost:4042/bitnami/python/lib/python3.11/site-packages/py4j/clientserver.py#line=510), in ClientServerConnection.send_command(self, command)
509 try:
510 while True:
--> 511 answer = smart_decode(self.stream.readline()[:-1])
512 logger.debug("Answer received: {0}".format(answer))
513 # Happens when a the other end is dead. There might be an empty
514 # answer before the socket raises an error.
File ~opt[/bitnami/python/lib/python3.11/socket.py:706](http://localhost:4042/bitnami/python/lib/python3.11/socket.py#line=705), in SocketIO.readinto(self, b)
704 while True:
705 try:
--> 706 return self._sock.recv_into(b)
707 except timeout:
708 self._timeout_occurred = True
KeyboardInterrupt:
How long did you wait? |
at least a few minutes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Uses YARN by default, but can use spark standalone optionally for now.
Tested by importing and querying the Fast Genomics Gene table.