scala - ¿Cómo obtener la identificación de una tarea de mapa en Spark?
hadoop apache-spark (2)
¿Hay alguna manera de obtener la identificación de una tarea de mapa en Spark? Por ejemplo, si cada tarea de mapeo llama a una función definida por el usuario, ¿puedo obtener el ID de esa tarea de mapeo dentro de esa función definida por el usuario?
Creo que
TaskContext.taskAttemptId
es lo que quieres.
Puede obtener el contexto de la tarea actual dentro de una función a través de
TaskContext.get
.
No estoy seguro de qué quiere decir con ID de tarea de mapa, pero puede acceder a la información de la tarea usando
TaskContext
:
import org.apache.spark.TaskContext
sc.parallelize(Seq[Int](), 4).mapPartitions(_ => {
val ctx = TaskContext.get
val stageId = ctx.stageId
val partId = ctx.partitionId
val hostname = java.net.InetAddress.getLocalHost().getHostName()
Iterator(s"Stage: $stageId, Partition: $partId, Host: $hostname")
}).collect.foreach(println)
Se ha agregado una funcionalidad similar a PySpark en Spark 2.2.0 ( SPARK-18576 ):
from pyspark import TaskContext
import socket
def task_info(*_):
ctx = TaskContext()
return ["Stage: {0}, Partition: {1}, Host: {2}".format(
ctx.stageId(), ctx.partitionId(), socket.gethostname())]
for x in sc.parallelize([], 4).mapPartitions(task_info).collect():
print(x)