# Hive ## Prepare data cat /data/NASA_access_log_Jul95 |awk -F' ' '{print "\""$4 $5"\","$(NF-1)","$(NF)}' > nasa.csv ## Prepare DB hive> create database luckow; Time taken: 0.122 seconds hive> use luckow; Time taken: 0.011 seconds ## Simple CREATE TABLE nasa ( access_time STRING, response_code STRING, response_time INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE; LOAD DATA LOCAL INPATH '/home/luckow/nasa.csv' OVERWRITE into table nasa; ## Run Queries: select response_code, count(*) from nasa group by response_code; hive> select response_code, count(*) from nasa group by response_code; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= Starting Job = job_1414187303892_0008, Tracking URL = http://ip-10-186-164-81.ec2.internal:8088/proxy/application_1414187303892_0008/ Kill Command = /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/hadoop/bin/hadoop job -kill job_1414187303892_0008 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2014-10-25 13:23:19,557 Stage-1 map = 0%, reduce = 0% 2014-10-25 13:23:28,859 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.1 sec 2014-10-25 13:23:39,201 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 4.71 sec MapReduce Total cumulative CPU time: 4 seconds 710 msec Ended Job = job_1414187303892_0008 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 4.71 sec HDFS Read: 73439116 HDFS Write: 81 SUCCESS Total MapReduce CPU Time Spent: 4 seconds 710 msec OK 200 1701534 302 46573 304 132627 400 5 403 54 404 10845 500 62 501 14 ## Test Compression CREATE TABLE nasa_orc ( access_time STRING, response_code STRING, response_time INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as orc tblproperties ("orc.compress"="SNAPPY"); insert overwrite table nasa_orc select * from nasa; CREATE TABLE nasa_orc_gzip ( access_time STRING, response_code STRING, response_time INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as orc tblproperties ("orc.compress"="ZLIB"); insert overwrite table nasa_orc_gzip select * from nasa; CREATE TABLE nasa_parquet ( access_time STRING, response_code STRING, response_time INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as parquetfile; insert overwrite table nasa_parquet select * from nasa; ### Show file sizes http://ec2-54-83-41-227.compute-1.amazonaws.com:50070/explorer.html#/user/hive/warehouse/luckow.db ### Test runtimes # Impala Open Shell: impala-shell Connect: connect ip-10-142-176-20; Make sure tables show up: invalidate metadata; Run Query: [ip-10-142-176-20:21000] > select response_code, count(*) from nasa group by response_code; Query: select response_code, count(*) from nasa group by response_code +---------------+----------+ | response_code | count(*) | +---------------+----------+ | 200 | 1701534 | | 403 | 54 | | 304 | 132627 | | 500 | 62 | | 404 | 10845 | | alyssa.p | 1 | | 302 | 46573 | | 501 | 14 | | 400 | 5 | +---------------+----------+ Fetched 9 row(s) in 2.23s # Spark ## sc is an existing SparkContext. from pyspark.sql import HiveContext sqlContext = HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. results = sqlContext.sql("FROM src SELECT key, value").collect() from pyspark.mllib.clustering import KMeans from numpy import array from math import sqrt #################################################################################### # Load and parse the data # Upload Sample data: hadoop fs -put /data/kmeans_data.txt . data = sc.textFile("kmeans_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) # Build the model (cluster the data) clusters = KMeans.train(parsedData, 2, maxIterations=10, runs=10, initializationMode="random") print str(clusters.clusterCenters) parsedData.map(lambda point: (str(point), clusters.predict(point))).collect()