pig tutorial - apache pig tutorial - Python UDFs in Pig ? - pig latin - apache pig - pig hadoop



Introduction

  • Apache Pig is a popular system for executing complex Hadoop map-reduce based data-flows. It adds a layer of abstraction on top of Hadoop's map-reduce mechanisms in order to allow developers to take a high-level view of the data and operations on that data.
  • Pig allows we to do things more explicitly. For example, we can join two or more data sources (much like an SQL join). Writing a join as a mapand reduce function is a bit of a drag and it's usually worth avoiding.
  • So Pig is great because it simplifies complex tasks - it provides a high-level scripting language that allows users to take more of a picture view of their data flow.
  • Pig is especially great because it is extensible. This will focus on its extensibility.
  • We will be able to write PigLatin scripts that execute Python code as a part of a larger map-reduce workflow. Pig can be extended with other languages too, but for now we'll stick to Python.

User Defined Functions (UDFs)

  • A Pig UDF is a function that is accessible to Pig, but written in a language that isn't PigLatin. Pig allows to register UDFs for use within a PigLatin script.
  • A UDF needs to fit a specific prototype - we can't just write your function however we want because then Pig won't know how to call your function, it won't know what kinds of arguments it needs, and it won't know what kind of return value to expect.

UDF types:

Eval UDFs

  • This is the most common type of UDF. It's used in FOREACH type statements. Here's an example of an eval function in action:
users = LOAD 'user_data' AS (name: chararray);
upper_users = FOREACH users GENERATE my_udfs.to_upper_case(name);

Aggregation UDFs

  • These are just a special case of an eval UDF. An Aggregate function is usually applied to grouped data. For example:
user_sales = LOAD 'user_sales' AS (name: chararray, price: float);
grouped_sales = GROUP user_sales BY name;
number_of_sales = <b>FOREACH</b> grouped_sales GENERATE group, COUNT(user_sales);
  • In other words, an aggregate UDF is a udf that is used to combine multiple pieces of information. Here we are aggregating sales data to show how many purchases were made by each user.

Filter UDFs

  • A filter UDF returns a boolean value. If we have a data source that has a bunch of rows and only a portion of those rows are useful for the current analysis then a filter function of some kind would be useful.

An example of a filter function is action follows:

user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
rude_messages = FILTER user_messages by my_udfs.contains_naughty_words(message);

Here's the code about the simplest Python UDF we can write:

from pig_util import outputSchema

@outputSchema('word:chararray')
def hi_world():
    return "hello world"
  • The data output from a function has a specific form. Pig likes it if we specify the schema of the data because then it knows what it can do with that data. That's what the output_schema decorator is for. There are a bunch of different ways to specify a schema, we'll get to that in a little bit.
users = LOAD 'user_data' AS (name: chararray);
hello_users = FOREACH users GENERATE name, my_special_udfs.hi_world();

Specifying the UDF output schema

  • Now a UDF has input and output. This little section is all about the outputs. Here we'll go over the different ways we can specify the output format of a Python UDF through use of the outputSchema decorator. We have a few options, here they are:
# our original udf
# it returns a single chararray (that's PigLatin for String)
@outputSchema('word:chararray')
def hi_world():
    return "hello world"
    
# this one returns a Python tuple. Pig recognises the first element 
# of the tuple as a chararray like before, and the next one as a 
# long (a kind of integer)
@outputSchema("word:chararray,number:long")
def hi_everyone():
  return "hi there", 15

#we can use outputSchema to define nested schemas too, here is a bag of tuples
@outputSchema('some_bag:bag{t:(field_1:chararray, field_2:int)}')
def bag_udf():
    return [
        ('hi',1000),
        ('there',2000),
        ('bill',0)
    ]

#and here is a map
@outputSchema('something_nice:map[]')
def my_map_maker():
    return {"a":"b", "c":"d", "e","f"}

So outputSchema can be used to imply that a function outputs one or a combination of basic types.

  • Those types are:
    • chararray: like a string
    • bytearray: a bunch of bytes in a row. Like a string but not as human friendly
    • long: long integer
    • int: normal integer
    • double: floating point number
    • datetime
    • boolean
  • If no schema is specified then Pig assumes that the UDF outputs a bytearray.

UDF arguments

  • Not only does a UDF have outputs but inputs as well! This sentence should be filed under 'dah'. we reserved it for a separate section so as not to clutter the discussion on output schemas.

First some UDFs:

def deal_with_a_string(s1):
    return s1 + " for the win!"

def deal_with_two_strings(s1,s2):
    return s1 + " " + s2
    
def square_a_number(i):
    return i*i
    
def now_for_a_bag(lBag):
    lOut = []
    for i,l in enumerate(lBag):
        lNew = [i,] + l
        lOut.append(lNew)
    return lOut

Here is the use of those UDFs in a PigLatin script:

REGISTER 'myudf.py' using jython as myudfs
users = LOAD 'user_data' AS (firstname: chararray, lastname:chararray,some_integer:int);

winning_users    = FOREACH users GENERATE myudfs.deal_with_a_string(firstname);
full_names       = FOREACH users GENERATE myudfs.deal_with_two_strings(firstname,lastname);
squared_integers = FOREACH users GENERATE myudfs.square_a_number(some_integer);

users_by_number = GROUP users by some_integer;
indexed_users_by_number = FOREACH users_by_number GENERATE group,myudfs.now_for_a_bag(users);

Related Searches to Python UDFs in Pig