EXTRACT Expression (U-SQL)
Summary
One of U-SQL’s core capabilities is to be able to schematize unstructured data on the fly without having to create a metadata object for it. This capability is provided by the EXTRACT expression that will invoke either a user-defined extractor or built-in extractor to process the input file or set of files specified in the FROM clause and produces a rowset whose schema is specified in the EXTRACT clause.
The processing of the extraction is done in parallel unless otherwise specified by the extractor. Even a single file will be split into parts which then are processed in parallel. The degree of parallelism depends on how big the files are, how many files there are, what the job’s specified degree of parallelism is etc. For more information about the processing model of extractors, please refer to the the U-SQL Programmability Guide.
Syntax
Extract_Expression := 'EXTRACT' Column_Definition_List Extract_From_Clause USING_Clause.
Remarks
Column_Definition_List
This list defines the schema of the extraction. The extracted columns are defined in the EXTRACT clause as a pair of column names and column types:
Syntax
Column_Definition_List := Column_Definition { ',' Column_Definition}.
Column_Definition
A column definition is of the form
Syntax
Column_Definition := Quoted_or_Unquoted_Identifier Built_in_Type.
Each column has an identifier that can be either an unquoted or quoted identifier which is typed with one of the built-in U-SQL types and has to be supported by the extractor.
If the files are being specified with a file set, then the column definition list also needs to include the so-called virtual columns that are being used in the file set pattern in the EXTRACT’s FROM clause and the specification of their types.
Extract_From_Clause
The EXTRACT’s FROM clause designates the source of the data that needs to be extracted in form of an Input_File_Path that is either a file path, a comma-separated list of file paths, or a pattern over a set of files (follow the link for more details on the different supported file paths and patterns):
Syntax
Extract_From_Clause := 'FROM' Input_File_Path.
The Input_File_Path is specified as a string literal, a reference to a string typed variable or a constant foldable expression.
USING_Clause
The USING clause specifies which extractor should be used to turn the file(s) into a rowset.
Syntax
USING_Clause := 'USING' udo_expression.
It takes a C# expression that returns an instance of IExtractor
. U-SQL provides a small set of predefined extractors for common text formats and users can write their own by implementing an IExtractor
, see U-SQL Programmability Guide: User-Defined Extractor for more detail on how to write your own extractor. The built-in extractors are part of the built-in Extractors
namespace. See U-SQL Built-in Extractors for more information on the built-in extractors and their arguments.
Extraction from compressed data
In general, the files are passed as is to the UDO. One exception is that EXTRACT
will recognize GZip compressed files with the file extension .gz
and automatically decompress them as part of the extraction process. The actual UDO will see the uncompressed data. For any other compression scheme, users will have to write their own custom extractor. Note that U-SQL has an upper limit of 4GB on a GZip compressed file. If you apply your EXTRACT
expression to a file larger than this limit, the error E_RUNTIME_USER_MAXCOMPRESSEDFILESIZE
is being raised during the compilation of the job.
Examples
- The examples can be executed in Visual Studio with the Azure Data Lake Tools plug-in.
- The first example below use the sample data provided with your Data Lake Analytics account. See Prepare source data for additional information.
- For simplicity, the example(s) with user-defined code make use of Code-Behind for assembly management. The main advantage of Code-Behind is that the tooling will register the assembly file and add the REFERENCE ASSEMBLY statement automatically. To use Assembly registration instead of Code-Behind, see Using Assemblies: Code-Behind vs. Assembly Registration Walkthrough.
Using Built-In Extractor
The following EXTRACT expression schematizes the file SearchLog.txt in the /Samples/Data directory of the default Azure Data Lake Storage by using the built-in TSV format extractor:
@searchlog =
EXTRACT UserId int
, Start DateTime
, Region string
, Query string
, Duration int
, Urls string
, ClickedUrls string
FROM "/Samples/Data/SearchLog.tsv"
USING Extractors.Tsv();
Extractor with ORDER BY and FETCH
The ORDER BY clause with FETCH allows the selection of a limited number of rows based on the specified order.
// Only extracts top 10 records by Start date
@searchlog =
EXTRACT UserId int
, Start DateTime
, Region string
, Query string
, Duration int
, Urls string
, ClickedUrls string
FROM "/Samples/Data/SearchLog.tsv"
USING Extractors.Tsv()
ORDER BY Start DESC FETCH 10 ROWS;
OUTPUT @searchlog
TO "/ReferenceGuide/QSE/Extract/SearchLog_extracted.txt"
USING Outputters.Tsv();
One directory with multiple files
The following EXTRACT expression schematizes the files specified by the file set in the FROM clause. The schema contains the additional virtual columns date and filename that are part of the file set pattern and will be used in later query expressions to identify the actual selected files.
// Sample file naming convention: vehicle1_09142014.csv, vehicle2_09142014.csv
DECLARE @dir string = "/Samples/Data/AmbulanceData/";
DECLARE @file_set_path string = @dir + "vehicle{vid}_{date:MM}{date:dd}{date:yyyy}.csv";
@data =
EXTRACT vehicle_id int,
entry_id long,
event_date DateTime,
latitude float,
longitude float,
speed int,
direction string,
trip_id int?,
vid int, // virtual file set column
date DateTime // virtual file set column
FROM @file_set_path
USING Extractors.Csv();
@result =
SELECT vehicle_id, entry_id, event_date, latitude,longitude,
speed, direction, trip_id, vid,
date.ToString("yyyy-MM-dd") AS fileDate
FROM @data
WHERE date >= DateTime.Parse("2014-08-31") AND date < DateTime.Parse("2014-10-31")
AND vid == 1;
OUTPUT @result
TO "/ReferenceGuide/QSE/Extract/Example1.txt"
USING Outputters.Csv();
Multiple directories with multiple files
// Sample file naming convention: vehicle1_09142014.csv, vehicle2_09142014.csv
// Sample directory naming convention: /Samples/Data/AmbulanceData/2014/09/15/, /Samples/Data/AmbulanceData/2014/09/14/
DECLARE @file_set_path2 string = @dir + "{date:yyyy}/{date:MM}/{date:dd}/vehicle{vid}_{date:MM}{date:dd}{date:yyyy}.csv";
@data =
EXTRACT vehicle_id int,
entry_id long,
event_date DateTime,
latitude float,
longitude float,
speed int,
direction string,
trip_id int?,
vid int, // virtual file set column
date DateTime // virtual file set column
FROM @file_set_path2
USING Extractors.Csv();
@result =
SELECT *
FROM @data
WHERE date == DateTime.Parse("2014-09-15")
AND vid == 2;
OUTPUT @result
TO "/ReferenceGuide/QSE/Extract/Example2.txt"
USING Outputters.Csv();
User-Defined Extractor - SampleExtractor
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace ReferenceGuide_Examples
{
public class SampleExtractor : IExtractor
{
private Encoding _encoding;
private byte[] _row_delim;
private char _col_delim;
public SampleExtractor(Encoding encoding, string row_delim = "\r\n", char col_delim = '\t')
{
this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding);
this._row_delim = this._encoding.GetBytes(row_delim);
this._col_delim = col_delim;
}
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
{
string line;
//Read the input line by line
foreach (Stream current in input.Split(_encoding.GetBytes("\r\n")))
{
using (StreamReader streamReader = new StreamReader(current, this._encoding))
{
line = streamReader.ReadToEnd().Trim();
//Split the input by the column delimiter
string[] parts = line.Split(this._col_delim);
int count = 0;
foreach (string part in parts)
{
//If its the second column, treat it in a special way, split the column into first name and last name columns
if (count == 1)
{
// identify number of parts in name
int nameCount = part.Trim().Count(f => f == ' ');
// two part name
if (nameCount == 1)
{
string[] name = part.Trim().Split(' ');
output.Set<string>(count, name[0]);
count += 1;
output.Set<string>(count, name[1]);
}
// three part name
else if (nameCount == 2)
{
string[] name = part.Trim().Split(' ');
// combine first & middle name
output.Set<string>(count, name[0] + " " + name[1]);
// alternatively skip the middle name
// output.Set<string>(count, name[0]);
count += 1;
output.Set<string>(count, name[2]);
}
// one part name
else
{
output.Set<string>(count, part);
count += 1;
}
}
else
{
output.Set<string>(count, part);
}
count += 1;
}
}
yield return output.AsReadOnly();
}
yield break;
}
}
}
Using User-Defined Extractor - SampleExtractor
Extracts first and last names as separate columns using the custom extractor defined above.
@drivers =
EXTRACT id string,
first_name string,
last_name string,
address string,
city string,
region string,
zipcode string,
country string,
phone_numbers string
FROM "/Samples/Data/AmbulanceData/Drivers.txt"
USING new ReferenceGuide_Examples.SampleExtractor(Encoding.Unicode);
OUTPUT @drivers
TO "/ReferenceGuide/QSE/Extract/Drivers.txt"
USING Outputters.Tsv(encoding:Encoding.UTF8, quoting:false);
User-Defined Extractor - DriverExtractor
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
/*
slightly modied version from:
https://github.com/Azure/usql/blob/master/Examples/AmbulanceDemos/AmbulanceDemoCode/Class1.cs
The outputter utilizes three functions which are defined below:
RemoveOptionalQuotes, ReadStringMap, and ReadIntArray
*/
namespace ReferenceGuide_Examples
{
public class DriverFunctions
{
// string RemoveOptionalQuotes(string s)
//
// Removes Quotes from string s (and de-duplicate contained "") if the string is embedded in "
public static string RemoveOptionalQuotes(string s)
{
return (s.Length > 1 && s[0] == '"' && s[s.Length - 1] == '"') ? s.Substring(1, s.Length - 2).Replace("\"\"", "\"") : s;
}
// SqlMap<string, string> ReadStringMap(string val, string map_item_delim = ",", string map_kv_delim = ":")
//
// transforms the input string val into a SQL.MAP instance using the provided delimiters to separate key-value pairs and the key and value in each pair.
// Both the key and value types are string.
public static SqlMap<string, string> ReadStringMap(string val, string map_item_delim = ",", string map_kv_delim = ":")
{
return new SqlMap<string, string>(
from p in val.Split(new string[]
{
map_item_delim
}, StringSplitOptions.None)
select new KeyValuePair<string, string>(p.Split(new string[]
{
map_kv_delim
}, StringSplitOptions.None)[0], p.Split(new string[]
{
map_kv_delim
}, StringSplitOptions.None)[1]));
}
// SqlArray<int> ReadIntArray(string val, string array_item_delim = ",")
//
// returns a SQL.ARRAY<int> from the input string val using the provided array item delimiter.
public static SqlArray<int> ReadIntArray(string val, string array_item_delim = ",")
{
return new SqlArray<int>(
from x in val.Split(new string[]
{
array_item_delim
}, StringSplitOptions.None)
select Convert.ToInt32(x));
}
}
// DriverExtractor
//
// Defines a user-defined extractor that can supports reading CSV-like data into SQL.MAP<string,string> columns and SQL.ARRAY<int> columns.
// Extractor assume homogeneous row formats and can be parallelized
//
// Usage (after registration of assembly and referencing assembly in script, default values shown):
// EXTRACT ... FROM ...
// USING new AmbulanceDemoCode.DriverExtractor(row_delim:"\r\n", col_delim: ",",map_item_delim: ",", map_kv_delim:":", array_item_delim:",", encoding:Encoding.UTF8);
//
[SqlUserDefinedExtractor(AtomicFileProcessing = false)]
public class DriverExtractor : IExtractor
{
// Class variables that are set with the class initializer
private byte[] _row_delim;
private string _col_delim;
private string _map_item_delim;
private string _map_kv_delim;
private string _array_item_delim;
private Encoding _encoding;
// DriverExtractor(string row_delim = "\r\n", string col_delim = ",", string map_item_delim = ",", string map_kv_delim = ":", string array_item_delim = ",", Encoding encoding = null)
//
// Class initializer that provides optional extractor parameters.
public DriverExtractor(string row_delim = "\r\n", string col_delim = ",", string map_item_delim = ",", string map_kv_delim = ":", string array_item_delim = ",", Encoding encoding = null)
{
this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding);
this._row_delim = this._encoding.GetBytes(row_delim);
this._col_delim = col_delim;
this._map_item_delim = map_item_delim;
this._map_kv_delim = map_kv_delim;
this._array_item_delim = array_item_delim;
}
// void OutputValueAtCol_I(string c, int i, IUpdatableRow outputrow)
//
// Helper function that takes the string value c and puts it into the column at position i in the output row.
// The value will be cast to the expected type of the column.
private void OutputValueAtCol_I(string c, int i, IUpdatableRow outputrow)
{
ISchema schema = outputrow.Schema;
if (schema[i].Type == typeof(SqlMap<string, string>))
{
c = DriverFunctions.RemoveOptionalQuotes(c);
SqlMap<string, string> scopeMap = String.IsNullOrEmpty(c) ? null : DriverFunctions.ReadStringMap(c, this._map_item_delim, this._map_kv_delim);
outputrow.Set<SqlMap<string, string>>(i, scopeMap);
}
else if (schema[i].Type == typeof(SqlArray<int>))
{
c = DriverFunctions.RemoveOptionalQuotes(c);
SqlArray<int> scopeArray = String.IsNullOrEmpty(c) ? null : DriverFunctions.ReadIntArray(c, this._array_item_delim);
outputrow.Set<SqlArray<int>>(i, scopeArray);
}
else if (schema[i].Type == typeof(int))
{
int num = Convert.ToInt32(c);
outputrow.Set<int>(i, num);
}
else if (schema[i].Type == typeof(int?))
{
int? num2 = (c == "") ? null : new int?(Convert.ToInt32(c));
outputrow.Set<int?>(i, num2);
}
else if (schema[i].Type == typeof(long))
{
long num3 = Convert.ToInt64(c);
outputrow.Set<long>(i, num3);
}
else if (schema[i].Type == typeof(long?))
{
long? num4 = (c == "") ? null : new long?(Convert.ToInt64(c));
outputrow.Set<long?>(i, num4);
}
else if (schema[i].Type == typeof(DateTime))
{
DateTime dateTime = Convert.ToDateTime(c);
outputrow.Set<DateTime>(i, dateTime);
}
else if (schema[i].Type == typeof(DateTime?))
{
DateTime? dateTime2 = (c == "") ? null : new DateTime?(Convert.ToDateTime(c));
outputrow.Set<DateTime?>(i, dateTime2);
}
else if (schema[i].Type == typeof(string))
{
string text = DriverFunctions.RemoveOptionalQuotes(c);
outputrow.Set<string>(i, text);
}
else
{
outputrow.Set<string>(i, c);
}
}
// IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow outputrow)
//
// Actual implementation of DriverExtractor that overwrites the Extract method of IExtractor.
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow outputrow)
{
foreach (Stream current in input.Split(this._row_delim))
{
using (StreamReader streamReader = new StreamReader(current, this._encoding))
{
int num = 0;
string[] array = streamReader.ReadToEnd().Split(new string[] { this._col_delim }, StringSplitOptions.None);
for (int i = 0; i < array.Length; i++)
{
string c = array[i];
this.OutputValueAtCol_I(c, num++, outputrow);
}
}
yield return outputrow.AsReadOnly();
}
yield break;
}
}
}
Using User-Defined Extractor - DriverExtractor
Defines a user-defined extractor that supports reading CSV-like data into SQL.MAP<string,string> columns and SQL.ARRAY<int> columns. Extractor assume homogeneous row formats and can be parallelized. Using Code-Behind from previous section, above.
@Drivers =
EXTRACT driver_id int
, name string
, street string
, city string
, region string
, zipcode string
, country string
, phone_numbers SQL.MAP<string, string>
FROM "/Samples/Data/AmbulanceData/Drivers.txt"
USING new ReferenceGuide_Examples.DriverExtractor(col_delim: "\t", encoding: Encoding.Unicode);
// Optional if you want to view results
@result =
SELECT name,
r.key.Trim() AS PhoneType, r.value AS PhoneNumber
FROM @Drivers
CROSS APPLY
EXPLODE(phone_numbers) AS r(key, value);
OUTPUT @result
TO "/ReferenceGuide/QSE/Extract/DriverExtractor.txt"
USING Outputters.Tsv(encoding:Encoding.UTF8, quoting:false);
User-Defined Extractor - FlexExtractor
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
namespace FlexibleSchemaExtractor
{
public class FlexExtractor : IExtractor
{
private Encoding _encoding;
private byte[] _row_delim;
private string _col_delim;
public FlexExtractor(Encoding encoding = null, string row_delim = "\r\n", string col_delim = ",")
{
this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding);
this._row_delim = this._encoding.GetBytes(row_delim);
this._col_delim = col_delim;
}
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
{
var colsInSchema = output.Schema.Count;
// let's check global assumptions
// - first 4 provided columns are int, string, int, decimal.
if (output.Schema[0].Type != typeof(System.Int32)
|| output.Schema[1].Type != typeof(System.String)
|| output.Schema[2].Type != typeof(System.Int32)
|| output.Schema[3].Type != typeof(System.Decimal)
)
{
throw new Exception("First 4 columns are not of expected types int32, string, int32, decimal.");
}
foreach (Stream currentline in input.Split(this._row_delim))
{
using (StreamReader lineReader = new StreamReader(currentline, this._encoding))
{
string[] columns = lineReader.ReadToEnd().Split(new string[] { this._col_delim }
, StringSplitOptions.None);
var colsInData = columns.Length;
// let's check row level assumptions
// - if less columns are specified, then last column needs to be of type SqlMap<Int32, string>
if (colsInData > colsInSchema
&& output.Schema[colsInSchema - 1].Type != typeof(SqlMap<Int32, string>))
{
throw new Exception(
"Too many columns detected and last column is not of type SqlMap<Int32,string>. "
+ "Add a final column of type SqlMap<Int32,string&ht; into your extract schema.");
}
// Set first 4 fixed columns
output.Set<Int32>(0, Int32.Parse(columns[0]));
output.Set<String>(1, columns[1]);
output.Set<Int32>(2, Int32.Parse(columns[2]));
output.Set<Decimal>(3, Decimal.Parse(columns[3]));
// Fill all remaining columns except the last which may be a map
for (int i = 4; i < Math.Min(colsInData, colsInSchema) - 1; i++)
{
output.Set<String>(i, columns[i]);
}
// Now handle last column: if it is a map
if (colsInData >= colsInSchema
&& output.Schema[colsInSchema - 1].Type == typeof(SqlMap<Int32, string>))
{
var sqlmap = new Dictionary<Int32, string>();
for (int j = colsInSchema - 1; j < colsInData; j++)
{
sqlmap.Add(j - colsInSchema + 1, columns[j]);
}
output.Set<SqlMap<Int32, string>>(colsInSchema - 1, new SqlMap<Int32, string>(sqlmap));
}
// Now handle last column: if it is not a map
else if (colsInData == Math.Min(colsInData, colsInSchema))
{
output.Set<string>(colsInData - 1, columns[colsInData - 1]);
}
yield return output.AsReadOnly();
}
}
}
}
}
Using User-Defined Extractor - FlexExtractor
A flexible schema extractor that is able to read a row-oriented file that has different column counts. The first 4 columns are fixed as order ID, product type, ordered amount and per item price. Remaining columns, both in their number and semantics, depend upon the product type. This example is taken from How to deal with files containing rows with different column counts in U-SQL: Introducing a Flexible Schema Extractor. Please refer to the referenced article for full details. Using Code-Behind from previous section, above.
/*
Copy the data below into a file called OrderData.csv.
1,Shoes,2,99.99,10
1,Pants,3,59,34,32
2,Camera,1,999.00,Canon,70D
2,Lens,1,999.99,Canon,100mm,Macro,1.4
2,Lens,1,459.99,Sigma,28-85mm,Macro/Zoom,2.8
3,Camera,1,745,Sony,RX-100-II
3,Shoes,1,69.99,13
*/
// Update with your actual file path.
DECLARE @input string = "/ReferenceGuide/QSE/Extract/OrderData.csv";
// Usage with all columns known
@data = EXTRACT orderid int, producttype string, orderamount int, itemprice decimal,
c1 string, c2 string, c3 string, c4 string
FROM @input
USING new FlexibleSchemaExtractor.FlexExtractor();
// product type Camera
@cameras =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
c2 AS model
FROM @data
WHERE producttype == "Camera";
OUTPUT @cameras
TO "/ReferenceGuide/QSE/Extract/cameras.csv"
USING Outputters.Csv();
// product type Lens
@lenses =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
c2 AS focallength,
c3 AS lenstype,
c4 == null ? (decimal?) null : Decimal.Parse(c4) AS aperture
FROM @data
WHERE producttype == "Lens";
OUTPUT @lenses
TO "/ReferenceGuide/QSE/Extract/lenses.csv"
USING Outputters.Csv();
/*******************************************************************/
/*** Variant Using SqlMap<Int32,string> ***/
/*******************************************************************/
// Usage with only a few known columns and a map for the rest
@data = EXTRACT orderid int, producttype string, orderamount int, itemprice decimal,
c1 string, map SqlMap<Int32,string>
FROM @input
USING new FlexibleSchemaExtractor.FlexExtractor();
// product type Camera
@cameras =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
map[0] AS model
FROM @data
WHERE producttype == "Camera";
OUTPUT @cameras
TO "/ReferenceGuide/QSE/Extract/cameras2.csv"
USING Outputters.Csv();
// product type Lens
@lenses =
SELECT orderid,
orderamount,
itemprice,
c1 AS make,
map[0] AS focallength,
map[1] AS lenstype,
map[2] == null ? (decimal?) null : Decimal.Parse(map[2]) AS aperture
FROM @data
WHERE producttype == "Lens";
OUTPUT @lenses
TO "/ReferenceGuide/QSE/Extract/lenses2.csv"
USING Outputters.Csv();
// remaining product types (serialize map generically)
@others =
SELECT orderid,
producttype,
orderamount,
itemprice,
c1,
map == null ? (string) null
: string.Join(" ; ",
from p in map
select string.Format("{0}{1}{2}", p.Key, " : ", p.Value)) AS map
FROM @data
WHERE producttype NOT IN ("Camera", "Lens");
OUTPUT @others
TO "/ReferenceGuide/QSE/Extract/others.csv"
USING Outputters.Csv();