# -*- coding: utf-8 -*-
from __future__ import print_function
import gzip
import time
import sys
try:
import cStringIO as StringIO
except:
import StringIO
from threading import Lock
from dummy_spark.rdd import RDD
try:
import tinys3
has_tinys3 = True
except ImportError as e:
has_tinys3 = False
__all__ = ['SparkContext']
__author__ = 'willmcginnis'
class hadoopConfiguration(object):
def __init__(self):
pass
def set(self, a, b):
setattr(self, a, b)
return True
def get(self, a):
return getattr(self, a, None)
class jvm(object):
def __init__(self):
self.hc = hadoopConfiguration()
def hadoopConfiguration(self):
return self.hc
def textFile(self, file_name):
if file_name.startswith('s3'):
if has_tinys3:
file_name = file_name.split('://')[1]
bucket_name = file_name.split('/')[0]
key_name = file_name.replace(bucket_name, '')[1:]
access_key = self.hc.get('fs.s3n.awsAccessKeyId')
secret_key = self.hc.get('fs.s3n.awsSecretAccessKey')
conn = tinys3.Connection(access_key, secret_key)
file = conn.get(key_name, bucket_name)
if file_name.endswith('.gz'):
compressed = StringIO.StringIO(file.content)
gzipper = gzip.GzipFile(fileobj=compressed)
return gzipper.readlines()
return file.content.decode('utf-8').split('\n')
else:
raise Exception('Need TinyS3 to use s3 files')
else:
if file_name.endswith('.gz'):
open = gzip.open
with open(file_name, 'r') as f:
return f.readlines()
[docs]class SparkContext(object):
"""
Main entry point for Spark functionality. A SparkContext represents the
connection to a Spark cluster, and can be used to create L{RDD} and
broadcast variables on that cluster.
"""
_gateway = None
_jvm = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
_jsc = jvm()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
DUMMY_VERSION = 'dummy version'
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=None, conf=None, gateway=None, jsc=None, profiler_cls=None):
"""
:param master:
:param appName:
:param sparkHome:
:param pyFiles:
:param environment:
:param batchSize:
:param serializer:
:param conf:
:param gateway:
:param jsc:
:param profiler_cls:
:return:
"""
self._callsite = None
SparkContext._ensure_initialized(self, gateway=gateway)
self.start_time = time.time()
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
except:
# If an error occurs, clean up in order to allow future SparkContext creation:
self.stop()
raise
def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls):
"""
:param master:
:param appName:
:param sparkHome:
:param pyFiles:
:param environment:
:param batchSize:
:param serializer:
:param conf:
:param jsc:
:param profiler_cls:
:return:
"""
# TODO: add included files to python path
return
def _initialize_context(self, jconf):
"""
:param jconf:
:return:
"""
return None
@classmethod
def _ensure_initialized(cls, instance=None, gateway=None):
"""
:param instance:
:param gateway:
:return:
"""
return True
def __enter__(self):
"""
:return:
"""
return self
def __exit__(self, type, value, trace):
"""
:param type:
:param value:
:param trace:
:return:
"""
self.stop()
[docs] def setLogLevel(self, logLevel):
"""
:param logLevel:
:return:
"""
pass
@classmethod
[docs] def setSystemProperty(cls, key, value):
"""
:param key:
:param value:
:return:
"""
pass
@property
def version(self):
"""
:return:
"""
return self.DUMMY_VERSION
@property
def startTime(self):
"""
:return:
"""
return self.start_time
@property
def defaultParallelism(self):
"""
:return:
"""
return 1
@property
def defaultMinPartitions(self):
"""
:return:
"""
return 1
[docs] def stop(self):
"""
:return:
"""
pass
[docs] def emptyRDD(self):
"""
:return:
"""
return RDD([], self, None)
[docs] def range(self, start, end=None, step=1, numSlices=None):
"""
:param start:
:param end:
:param step:
:param numSlices:
:return:
"""
return RDD(list(range(start, end, step)), self, None)
[docs] def parallelize(self, c, numSlices=None):
"""
:param c:
:param numSlices:
:return:
"""
return RDD(c, self, None)
[docs] def textFile(self, name, minPartitions=None, use_unicode=True):
"""
:param name:
:param minPartitions:
:param use_unicode:
:return:
"""
data = self._jsc.textFile(name)
rdd = RDD(data, self, None)
return rdd
[docs] def addPyFile(self, path):
"""
:param path:
:return:
"""
sys.path.append(path)
[docs] def pickleFile(self, name, minPartitions=None):
"""
NotImplemented
:param name:
:param minPartitions:
:return:
"""
raise NotImplementedError
[docs] def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
"""
NotImplemented
:param path:
:param minPartitions:
:param use_unicode:
:return:
"""
raise NotImplementedError
[docs] def binaryFiles(self, path, minPartitions=None):
"""
NotImplemented
:param path:
:param minPartitions:
:return:
"""
raise NotImplementedError
[docs] def binaryRecords(self, path, recordLength):
"""
NotImplemented
:param path:
:param recordLength:
:return:
"""
raise NotImplementedError
def _dictToJavaMap(self, d):
"""
NotImplemented
:param d:
:return:
"""
raise NotImplementedError
[docs] def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0):
"""
NotImplemented
:param path:
:param keyClass:
:param valueClass:
:param keyConverter:
:param valueConverter:
:param minSplits:
:param batchSize:
:return:
"""
raise NotImplementedError
[docs] def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):
"""
NotImplemented
:param path:
:param inputFormatClass:
:param keyClass:
:param valueClass:
:param keyConverter:
:param valueConverter:
:param conf:
:param batchSize:
:return:
"""
raise NotImplementedError
[docs] def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the
same as for sc.sequenceFile.
:param inputFormatClass: fully qualified classname of Hadoop InputFormat (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
:param keyClass: fully qualified classname of key Writable class (e.g. "org.apache.hadoop.io.Text")
:param valueClass: fully qualified classname of value Writable class (e.g. "org.apache.hadoop.io.LongWritable")
:param keyConverter: (None by default)
:param valueConverter: (None by default)
:param conf: Hadoop configuration, passed in as a dict (None by default)
:param batchSize: The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
"""
if 'elasticsearch' in inputFormatClass and 'elasticsearch' in valueClass:
try:
from elasticsearch import Elasticsearch
except ImportError:
raise ImportError('Must have elasticsearch-py installed to use NewAPIHadoopRDD with the elasticsearch driver')
host_name = conf.get('es.nodes')
host_port = conf.get('es.port')
index, mapping = conf.get('es.resource', '/').split('/')
query = conf.get('es.query')
client = Elasticsearch(hosts=['http://%s:%s' % (host_name, host_port, )])
data = client.search(index=index, doc_type=mapping, body=query)
data = data.get('hits', {}).get('hits', [])
cleaned_data = []
for dat in data:
if '_source' in dat.keys():
cleaned_data.append((dat.get('_id'), dat.get('_source', {})))
elif 'fields' in dat.keys():
cleaned_data.append((dat.get('_id'), dat.get('fields')))
rdd = RDD(cleaned_data, self, None)
return rdd
else:
raise NotImplementedError('Have not implimented %s for NewAPIHadoopRDD' % (inputFormatClass, ))
[docs] def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):
"""
NotImplemented
:param path:
:param inputFormatClass:
:param keyClass:
:param valueClass:
:param keyConverter:
:param valueConverter:
:param conf:
:param batchSize:
:return:
"""
raise NotImplementedError
[docs] def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0):
"""
NotImplemented
:param inputFormatClass:
:param keyClass:
:param valueClass:
:param keyConverter:
:param valueConverter:
:param conf:
:param batchSize:
:return:
"""
raise NotImplementedError
def _checkpointFile(self, name, input_deserializer):
"""
NotImplemented
:param name:
:param input_deserializer:
:return:
"""
raise NotImplementedError
[docs] def union(self, rdds):
"""
NotImplemented
:param rdds:
:return:
"""
raise NotImplementedError
[docs] def broadcast(self, value):
"""
NotImplemented
:param value:
:return:
"""
raise NotImplementedError
[docs] def accumulator(self, value, accum_param=None):
"""
NotImplemented
:param value:
:param accum_param:
:return:
"""
raise NotImplementedError
[docs] def addFile(self, path):
"""
NotImplemented
:param path:
:return:
"""
raise NotImplementedError
[docs] def clearFiles(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def setCheckpointDir(self, dirName):
"""
NotImplemented
:param dirName:
:return:
"""
raise NotImplementedError
def _getJavaStorageLevel(self, storageLevel):
"""
NotImplemented
:param storageLevel:
:return:
"""
raise NotImplementedError
[docs] def setJobGroup(self, groupId, description, interruptOnCancel=False):
"""
NotImplemented
:param groupId:
:param description:
:param interruptOnCancel:
:return:
"""
raise NotImplementedError
[docs] def setLocalProperty(self, key, value):
"""
NotImplemented
:param key:
:param value:
:return:
"""
raise NotImplementedError
[docs] def getLocalProperty(self, key):
"""
NotImplemented
:param key:
:return:
"""
raise NotImplementedError
[docs] def sparkUser(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def cancelJobGroup(self, groupId):
"""
NotImplemented
:param groupId:
:return:
"""
raise NotImplementedError
[docs] def cancelAllJobs(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def statusTracker(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
"""
NotImplemented
:param rdd:
:param partitionFunc:
:param partitions:
:param allowLocal:
:return:
"""
raise NotImplementedError
[docs] def show_profiles(self):
"""
NotImplemented
:return:
"""
raise NotImplementedError
[docs] def dump_profiles(self, path):
"""
NotImplemented
:param path:
:return:
"""
raise NotImplementedError