Skip to main content

An impatient start with Cascading

Last couple of years when i worked with Hadoop, in many blogs and conferences i have heard about Cascading framework on top of Hadoop to implements ETL. But in our lean start up project we decided to used Pig and we implemented our data flow based on Pig. Recently i have got a new book from O'Reilly Media "Enterprise Data Workflows with Cascading" and finished two interesting chapter with one breath. This weekend i have managed a couple of hours to make some try with examples from the Book. Author Paco Nathan very nicely explains why and when you should use Cascading instead of PIg or Hive, even more he gives examples to try at home. My today's blog is to my first expression on Cascading.

All the examples of the book could be found from the git hub. I have cloned the project from the Git hub and ready to run the examples. Project Impatient compiles and build with Gradle. I have run gradle clean jar and stacked with the following errors:
Could not resolve all dependencies for configuration ':providedCompile'.
> Could not download artifact 'org.codehaus.jackson:jackson-core-asl:1.8.8@jar'
   > Artifact 'org.codehaus.jackson:jackson-core-asl:1.8.8@jar' not found.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
I am not a Gradle geek and didn't find a quick fix for that and decided to use Maven for build and compile the examples (However, it will be great if some body point me how to fix). Further more author doesn't describe how to install standalone local Hadoop, certainly installing hadoop is out of scope of the Book. However we have to install local stand alone hadoop to run all the examples. Certainly there are a lot of blogs, articles could be found where briefly describe how to install Hadoop. As far as it's a impatient start we will install hadoop very quickly as possible.
1) Download the hadoop distribution hadoop-1.0.3.zip
2) Unzip the hadoop distribution in any folder.
3) Add the HADOOP_HOME to the env path.
actually we are ready to go but it's better to check three files in conf folder: core-site.xml, hdfs-site.xml, mapred-site.xml. All the three files should not have any configuration for local use as follows:
<configuration/>
4) Add the following pom.xml file in the directory part1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.blu.bigdata</groupId>
  <artifactId>casscading</artifactId>
  <version>1.0</version>
  <packaging>jar</packaging>

  <name>casscading</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
<dependency>
   <groupId>cascading</groupId>
   <artifactId>cascading-core</artifactId>
   <version>2.1.2</version>
</dependency>

<dependency>
   <groupId>cascading</groupId>
   <artifactId>cascading-local</artifactId>
   <version>2.1.2</version>
</dependency>

<dependency>
   <groupId>cascading</groupId>
   <artifactId>cascading-hadoop</artifactId>
   <version>2.1.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.1.2</version>
</dependency>

  </dependencies>
  <repositories>
<repository>
   <id>conjars.org</id>
   <url>http://conjars.org/repo</url>
</repository>
 </repositories>
    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2.1</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>impatient.Main</mainClass>
          
                        </manifest>
                    </archive>

                </configuration>
                <executions>
                    <execution>
                        <id>assemble-all</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
5) run mvn clean install, it should create a fat jar file casscading-1.0-jar-with-dependencies.jar
6) Now we are ready to run the example part1
rm -rf output
hadoop jar ./target/casscading-1.0-jar-with-dependencies.jar data/rain.txt ./output/rain
I have got the following out put from the console
13/08/11 20:53:11 INFO util.HadoopUtil: resolving application jar from found main method on: impatient.Main
13/08/11 20:53:11 INFO planner.HadoopPlanner: using application jar: /Users/samim/Development/NoSQL/casscading/./target/casscading-1.0-jar-with-dependencies.jar
13/08/11 20:53:11 INFO property.AppProps: using app.id: 96618C129BE0C7C591007BCFB650BAE6
2013-08-11 20:53:11.650 java[2161:1903] Unable to load realm info from SCDynamicStore
13/08/11 20:53:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/08/11 20:53:12 WARN snappy.LoadSnappy: Snappy native library not loaded
13/08/11 20:53:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/08/11 20:53:12 INFO util.Version: Concurrent, Inc - Cascading 2.1.2
13/08/11 20:53:12 INFO flow.Flow: [] starting
13/08/11 20:53:12 INFO flow.Flow: []  source: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
13/08/11 20:53:12 INFO flow.Flow: []  sink: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'text']]"]["output/rain"]"]
13/08/11 20:53:12 INFO flow.Flow: []  parallel execution is enabled: false
13/08/11 20:53:12 INFO flow.Flow: []  starting jobs: 1
13/08/11 20:53:12 INFO flow.Flow: []  allocating threads: 1
13/08/11 20:53:12 INFO flow.FlowStep: [] at least one sink does not exist
13/08/11 20:53:12 INFO flow.FlowStep: [] source modification date at: Sun Aug 11 18:50:44 MSK 2013
13/08/11 20:53:12 INFO flow.FlowStep: [] starting step: (1/1) output/rain
13/08/11 20:53:12 INFO mapred.FileInputFormat: Total input paths to process : 1
13/08/11 20:53:13 INFO flow.FlowStep: [] submitted hadoop job: job_local_0001
13/08/11 20:53:13 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
13/08/11 20:53:13 INFO io.MultiInputSplit: current split input path: file:/Users/samim/Development/NoSQL/casscading/data/rain.txt
13/08/11 20:53:13 INFO mapred.MapTask: numReduceTasks: 0
13/08/11 20:53:13 INFO hadoop.FlowMapper: cascading version: Concurrent, Inc - Cascading 2.1.2
13/08/11 20:53:13 INFO hadoop.FlowMapper: child jvm opts: -Xmx200m
13/08/11 20:53:13 INFO hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['doc_id', 'text']->[ALL]]"]["data/rain.txt"]"]
13/08/11 20:53:13 INFO hadoop.FlowMapper: sinking to: Hfs["TextDelimited[[UNKNOWN]->['doc_id', 'text']]"]["output/rain"]"]
13/08/11 20:53:13 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/08/11 20:53:13 INFO mapred.LocalJobRunner: 
13/08/11 20:53:13 INFO mapred.Task: Task attempt_local_0001_m_000000_0 is allowed to commit now
13/08/11 20:53:13 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_m_000000_0' to file:/Users/samim/Development/NoSQL/casscading/output/rain
13/08/11 20:53:16 INFO mapred.LocalJobRunner: file:/Users/samim/Development/NoSQL/casscading/data/rain.txt:0+510
13/08/11 20:53:16 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/08/11 20:53:18 INFO util.Hadoop18TapUtil: deleting temp path output/rain/_temporary
Shamim-2:casscading samim$ pwd
7) To view the results:
cat ./output/rain/*
here is the out put
doc_id text
doc01 A rain shadow is a dry area on the lee back side of a mountainous area.
doc02 This sinking, dry air produces a rain shadow, or area in the lee of a mountain with less rain and cloudcover.
doc03 A rain shadow is an area of dry land that lies on the leeward (or downwind) side of a mountain.
doc04 This is known as the rain shadow effect and is the primary cause of leeward deserts of mountain ranges, such as California's Death Valley.
doc05 Two Women. Secrets. A Broken Land. [DVD Australia]
Shamim-2:casscading samim$ hadoop jar ./target/casscading-1.0-jar-with-dependencies.jar  data/rain.txt ./output/wc
Seems every thing works fine, i also run the Word count example.
Actually i am interested to manipulate Cassandra data through Cascading, also Lingual. I already found a few git hub projects which implemented Cassandra Tap and Sink for easy integration, thank you guys for nice work.

Comments

Popular posts from this blog

Send e-mail with attachment through OSB

Oracle Service Bus (OSB) contains a good collection of adapter to integrate with any legacy application, including ftp, email, MQ, tuxedo. However e-mail still recognize as a stable protocol to integrate with any application asynchronously. Send e-mail with attachment is a common task of any business process. Inbound e-mail adapter which, integrated with OSB support attachment but outbound adapter doesn't. This post is all about sending attachment though JavaCallout action. There are two ways to handle attachment in OSB: 1) Use JavaCallout action to pass the binary data for further manipulation. It means write down a small java library which will get the attachment and send the e-mail. 2) Use integrated outbound e-mail adapter to send attachment, here you have to add a custom variable named attachment and assign the binary data to the body of the attachment variable. First option is very common and easy to implement through javax.mail api, however a much more developer manage t

Tip: SQL client for Apache Ignite cache

A new SQL client configuration described in  The Apache Ignite book . If it got you interested, check out the rest of the book for more helpful information. Apache Ignite provides SQL queries execution on the caches, SQL syntax is an ANSI-99 compliant. Therefore, you can execute SQL queries against any caches from any SQL client which supports JDBC thin client. This section is for those, who feels comfortable with SQL rather than execute a bunch of code to retrieve data from the cache. Apache Ignite out of the box shipped with JDBC driver that allows you to connect to Ignite caches and retrieve distributed data from the cache using standard SQL queries. Rest of the section of this chapter will describe how to connect SQL IDE (Integrated Development Environment) to Ignite cache and executes some SQL queries to play with the data. SQL IDE or SQL editor can simplify the development process and allow you to get productive much quicker. Most database vendors have their own front-en

Load balancing and fail over with scheduler

Every programmer at least develop one Scheduler or Job in their life time of programming. Nowadays writing or developing scheduler to get you job done is very simple, but when you are thinking about high availability or load balancing your scheduler or job it getting some tricky. Even more when you have a few instance of your scheduler but only one can be run at a time also need some tricks to done. A long time ago i used some data base table lock to achieved such a functionality as leader election. Around 2010 when Zookeeper comes into play, i always preferred to use Zookeeper to bring high availability and scalability. For using Zookeeper you have to need Zookeeper cluster with minimum 3 nodes and maintain the cluster. Our new customer denied to use such a open source product in their environment and i was definitely need to find something alternative. Definitely Quartz was the next choose. Quartz makes developing scheduler easy and simple. Quartz clustering feature brings the HA and