Warung Bebas

Minggu, 25 November 2012

BigData: HDFS FeatureClass ETL and MapReduce GPTool


This post is dedicated to my Esri colleagues Ajit D. and Philip H. for their invaluable help.

This is work in progress - but I've put a good dent in it that I would like to share it with you.  In this post, we will go through a complete cycle, where from ArcMap, we will:

  • Export a FeatureClass to an HDFS folder
  • Register that folder as a Hive table
  • Run command line Hive queries
  • Execute Hive queries from ArcPy and show the results in ArcMap
  • Execute a MapReduce Job as a GP Tool
  • Import an HDFS Folder (result of MapReduce Job) as a FeatureClass

This post brings everything that I have been blogging about so far into a nice story, so here we go:

BTW - I am assuming that you have a Hadoop instance running somewhere and are familiar with ArcMap. You can download a Hadoop demo VM for local testing.

Download the ArcMap extension in this file and unzip its content into your ArcGIS\Desktop10.1\java\lib\ext folder - The jars have to be children of the ext folder.

Make sure to adjust the ArcGIS JVM using the JavaConfigTool.exe location in ArcGIS\Desktop10.1\bin:

Start ArcMap, create a new toolbox and add to it the Hadoop Tools - Check out this help for detailed information on managing toolboxes:

Add the world cities to ArcMap:

Let's export the world cities to HDFS:

This tool iterates over the input FeatureClass features and stores each feature in the specified HDFS output path. The output path content is text formatted and each feature is stored as a line in an Esri JSON text representation followed by a carriage return.
This enables us to continuously add new records from for example a streaming process such as Esri GeoEvent Server.

The metadata for that HDFS based FeatureClass is stored in an HDFS based 'metastore' for other processes to inspect - A better place would have been ZooKeeper - but that is a post for another day.

Here is a sample metadata:

{
"wkid": 4326,
"geometryType": 1,
"fields": [{
"name": "ObjectID",
"alias": "ObjectID",
"type": 6,
"length": 4
}, {
"name": "Shape",
"alias": "Shape",
"type": 7,
"length": 0
}, {
"name": "CITY_NAME",
"alias": "CITY_NAME",
"type": 4,
"length": 30
}, {
...
}, {
"name": "LABEL_FLAG",
"alias": "LABEL_FLAG",
"type": 1,
"length": 4
}]
}

The metastore contains a set of files where by convention the file name is the imported FeatureClass name followed by ".json". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.json

The GP import tool adds one more file to the metastore, a Hive script that you can execute from the Hive command line to create an external table referencing the HDFS FeatureClass. Again, by convention the script name is the name of the imported FeatureClass followed by ".hql". For example:

$ hadoop fs -cat hdfs://localhadoop:9000/user/mraad/metastore/worldcities.hql

You can "cat" the content of the script and you will notice the usage of the collection data type for the feature geometry and attribute representation. In addition, the serialization and deserialization (SerDe) from JSON is based on a Cloudera library found in the article 'Analyzing Twitter Data Using CDH'. You can download the jar from here.


ADD JAR hive-serdes-1.0-SNAPSHOT.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS worldcities (
geometry STRUCT <x:DOUBLE,y:DOUBLE,spatialReference:STRUCT <wkid:INT>>,
attributes STRUCT <
CITY_NAME:STRING,
GMI_ADMIN:STRING,
ADMIN_NAME:STRING,
FIPS_CNTRY:STRING,
CNTRY_NAME:STRING,
STATUS:STRING,
POP_RANK:INT,
POP_CLASS:STRING,
PORT_ID:INT,
LABEL_FLAG:INT
>) ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION 'hdfs://localhadoop:9000/user/mraad_admin/worldcities'
TBLPROPERTIES ('wkid'='4326','type'='point');


Please note how tables can have properties - is this case, I added the wkid and geometry type.

Upon the execution of the above commands, you can now query the table. Here are some sample queries:

hive> select * from worldcities limit 10;
hive> select attributes.city_name,geometry.x,geometry.y from world cities where attributes.cntry_name='Lebanon';

Hive can be accessed using ArcPy through the thrift protocol - Here is a Toolbox that enables the user to draw a polygon as input and invoke Hive spatial UDF constraining the resulting FeatureSet to the world cities within the drawn polygon. Download the UDF jar from here, and place it and the hive-serde jar in the same location where you will start the hive server as follows:

$> hive --service hiveserver

Next, I wanted to demo the capability to run a MapReduce Job as a GP Tool.

Quick MapReduce recap, For the unix geeks:

$> cat input.txt | map | sort | reduce > output.txt

And for the "acedemics":

map(K1,V1) emit list(K2,V2)
shuffle/sort K2
reduce(K2,list(V2)) emit list(K3,V3)

This is fairly low level and requires explicit writing of the Mapper and Reducer Java classes.  This is not for your average GIS user.  But I can see a time when advanced users will write parameter driven MapReduce tools and share them with the community. This is all based on 'How to build custom geoprocessing tools'.

This simple MR tool takes as input the world cities HDFS FeatureClass and finds the "centroids" by country of all the cities with a specific population rank.

BTW, this can easily be written in HQL as follows:

select attributes.cntry_name as name,
avg(geometry.x) as x,
avg(geometry.y) as y,
count(attributes.cntry_name) as cnt
from worldcities
where attributes.pop_rank < 6
group by attributes.cntry_name
having cnt > 10;

The JobRunnerTool accepts as input:

  • A set of Hadoop properties
  • HDFS FeatureClass input path
  • HDFS FeatureClass output path
  • Metastore location
The mapper converts a JSON formatter input text line (V1) into a PointFeature and will emit its point geometry (V2) if it meets a filter criteria - in this case, a population rank that is less than a user defined value. The mapper output key (K2) is the country name. BTW, K1 is the line number.

The suffle/sort portion will ensure that each reducer will receive a country name (K2) as an input key and a list of geometry points (V2) as input values.

The reducer averages the coordinates, creates a PointFeature whose geometry is a point with values based on the averaged calculations.  The attributes will include the country name and the number of points used in the averaging. The reducer key output (K3) will be the JSON formatted text representation of the PointFeature and the output value (V3) will be NULL. thus producing an HDFS FeatureClass with its metadata in the metastore for further processing and inspection.

Lastly, we close the cycle by importing the HDFS Feature class.

The tool accepts as input an HDFS FeatureClass, its metadata and an output location. When executed within ArcMap, the output is automatically added to the display.

Things to work on next:
  • UDF that accepts as input an ArcMap generated FeatureSet into the DistributedCache - I already blogged about this, but as standalone.
  • MapReduceTool that accepts external jar containing mapper/reducer classes - I think this will pave the way for the advanced users.
Stay tuned for more things to come. And like usual, all the source code can be downloaded from here.

0 komentar em “BigData: HDFS FeatureClass ETL and MapReduce GPTool”

Posting Komentar

 

Virush-SGB Copyright © 2012 Fast Loading -- Powered by Blogger