The idolnifi module
class LogStream(_io.IOBase)The LogStream class
def __init__(logFn, sep=' ', end='\n')def executePython(args,
env=None,
stdout=stdoutLog,
stderr=stderrLog,
onexit=None)Execute Python as a sub-process (Note: the idolnifi module is unavailable to sub-processes)
Arguments:
argslist - Python command argumentsenvdict - Additional environment variablesstdoutIOBase - stdout streamstderrIOBase - stderr streamonexit- function to call on process exit with result code, or wait for exit if None
Returns:
Result code if onexit is not callable
def executePythonToLines(args, outputLines)Execute Python as a sub-process, getting the command output to a list of lines (Note: the idolnifi module is unavailable to sub-processes)
Arguments:
argslist - Python command argumentsoutputLineslist - List to append stdout/stderr lines to
Returns:
Result code
def getStorageDirectory()Get the path to the idol_repository storage directory for the processor
def getFilesStorageDirectory()Get the path to the idol_repository files storage directory for the processor
def getTempFile(prefix='', extn='.tmp')Get a temporary file name with the given filename prefix and extension
Arguments:
prefixstr - Filename prefix stringextnstr - File extension (default '.tmp')
Returns:
Temporary file name
def getPropertyDescriptor(name)Get the named PropertyDescriptor
Arguments:
namestr - Property name
Returns:
PropertyDescriptor for the named property
def isScheduled()Is the processor scheduled
Returns:
true if the processor is scheduled, false otherwise
def readIOBaseToOutputStream(readable: _io.IOBase, outputstream)Transfer from a readable io.IOBase to an OutputStream
Arguments:
outputstreamOutputStream - Output stream
class StateMap()The StateMap class
def __init__(capsule)def get(key)Returns the value associated with the given key
Arguments:
keystr - State lookup key
Returns:
Value associated with the given key
def getVersion()Returns the version of the state, incremented each time the state is updated
Returns:
Version associated with the state
def toMap()Returns a non-modifiable dictionary of all state keys and value
Returns:
Dictionary of state keys and values
class StateManager()The StateManager class
def __init__(capsule)def clear(scope)Clears all keys and values from the state
Arguments:
scopeScope - The scope to use
def getState(scope)Returns the current state for the component
Arguments:
scopeScope - The scope to use
Returns:
The StateMap
def replace(oldState, newState, scope)Updates the value of the state to the new value if and only if the value currently is the same as the given oldValue
Arguments:
oldStateStateMap - the old value to compare againstnewStatedict - The value to change the state to if the state matches the oldStatescopeScope - The scope to use
Returns:
true if the state was updated to the new value, otherwise false
def setState(state, scope)Updates the value of the component state
Arguments:
statedict - The value to change the state toscopeScope - The scope to use
class PropertyDescriptor()The PropertyDescriptor class
def __init__(capsule)def getName()Get the name of the property
Returns:
The property name
def isDynamic()Is the property dynamic
Returns:
true if the property is dynamic, false otherwise
class PropertyValue()The PropertyValue class
def __init__(capsule)def getValue()Get the value as a string
Returns:
The value as a string
def evaluateAttributeExpressions(flowfile=None)Evaluate attribute expressions
Arguments:
flowfileFlowFile - FlowFile to use in expression evaluation
Returns:
The resulting PropertyValue
class ProcessContext()The ProcessContext class
def __init__(capsule)def getName()Get the configured name of the processor
Returns:
The processor name
def getProperties()Get all the properties for the processor
Returns:
Dictionary from PropertyDescriptor to value string
def getProperty(propertydescriptor)Get the current value for the property
Arguments:
propertydescriptorPropertyDescriptor | str - Property descriptor or name
Returns:
PropertyValue containing the value of the property
def newPropertyValue(value)Get a PropertyValue from a value string
Arguments:
valuestr - Value string
Returns:
PropertyValue for the value string
def getAnnotationData()Get the annotation data configured for this processor
Returns:
The processor annotation data
def getMaxConcurrentTasks()Get the maximum number of threads that this processor may be executing on at any given time
Returns:
The maximum number of processing threads
def isConnectedToCluster()Get the current state of the cluster connection of this node
Returns:
true if connected to a cluster, false otherwise
def getStateManager()Get the StateManager that can be used to store and retrieve state for this component
Returns:
The StateManager
def hasIncomingConnection()Returns whether the processor has incoming connections
Returns:
true if the processor has one or more incoming connections, false otherwise
def hasConnection(relationship)Returns whether the processor has outbound connections for a relationship name
Arguments:
relationshipstr - The relationship name
Returns:
true if the relationship has one or more outbound connections, false otherwise
def yieldProcessor()Causes the Processor not to be scheduled for some pre-configured amount of time
def relationshipAvailable(relationship)Returns whether the processor has available capacity on an outbound connections
Arguments:
relationshipstr - The relationship name
Returns:
true if the relationship has available capacity, false otherwise
class InputStream(_io.RawIOBase)The InputStream class
def __init__(capsule)class OutputStream(_io.RawIOBase)The OutputStream class
def __init__(capsule)class FlowFile()The FlowFile class
def __init__(capsule)def getUuid()Get the unique identifier for this FlowFile
Returns:
The FlowFile UUID
def getAttribute(name)Obtains the attribute value for the given key
Arguments:
namestr - Attribute name
Returns:
The attribute value
def getAttributes()Get a non-modifiable dictionary of all of the FlowFile attributes
Returns:
The dictionary of attributes
def getSize()Get the size of the FlowFile
Returns:
Size of the FlowFile contents in bytes
class ProcessSession()The ProcessSession class
def __init__(capsule)def putAttribute(flowfile, name, value)Adds an attribute to the FlowFile
Arguments:
flowfileFlowFile - The FlowFilenamestr - Attribute namevaluestr - Attribute value
Returns:
The updated FlowFile
def putAttributes(flowfile, attributes)Adds attributes to the FlowFile from the given dictionary
Arguments:
flowfileFlowFile - The FlowFileattributesdict - Dictionary of attributes
Returns:
The updated FlowFile
def removeAttribute(flowfile, name)Remove an attribute from a FlowFile
Arguments:
flowfileFlowFile - The FlowFilenamestr - Attribute name
Returns:
The updated FlowFile
def removeAttributes(flowfile, names=[], *, pattern=None)Remove attributes from a FlowFile by names or by matching a pattern
Arguments:
flowfileFlowFile - The FlowFilenameslist - List of attribute namespatternstr - Pattern string to match multiple attribute names
Returns:
The updated FlowFile
def read(flowfile, reader: _t.Callable[[InputStream], None])Executes the given callback against the contents corresponding to the given FlowFile
Arguments:
flowfileFlowFile - The FlowFilereaderCallable - Callable accepting an InputStream
def remove(flowfile)Remove the FlowFile from the session
Arguments:
flowfileFlowFile - The FlowFile
def transfer(flowfile, relationship=None)Transfer the FlowFile to the named output relationship or back to the input queue
Arguments:
flowfileFlowFile - The FlowFilerelationshipstr - The output relationship name
def write(flowfile, writer: _t.Callable[[OutputStream], None])Executes the given callback against the content corresponding to the given FlowFile
Arguments:
flowfileFlowFile - The FlowFilewriterCallable - Callable accepting an OutputStream
def readWrite(flowfile, readerWriter: _t.Callable[[InputStream, OutputStream],
None])Executes the given callback against the content corresponding to the given FlowFile
Arguments:
flowfileFlowFile - The FlowFilereaderWriterCallable - Callable accepting an InputStream and an OutputStream
def penalize(flowfile)Penalize the FlowFile for the penalty duration
Arguments:
flowfileFlowFile - The FlowFile
Returns:
The penalized FlowFile
def get(*, maxResults=None)Returns one FlowFile or a list of up to maxResults FlowFiles from the input queue
Arguments:
maxResultsint - If specified, the maximum number of FlowFiles to return as a list
Returns:
One FlowFile, a list of FlowFiles, or None
def getQueueSize()Returns the number of FlowFiles in the input queue
Returns:
Number of FlowFiles
def commit()Commits the current session ensuring all operations against FlowFiles within this session are atomically persisted
def rollback()Reverts any changes made during this session
def create(parent=None)Creates a new FlowFile, optionally with parent linkage
Arguments:
parentFlowFile - Optional parent FlowFile
Returns:
The created FlowFile
def asFlowFileDocument(flowfile)Get a FlowFileDocument representation of the FlowFile for reading and writing IDOL document format
Arguments:
flowfileFlowFile - The source FlowFile
Returns:
The FlowFileDocument for the FlowFile
class XmlElement()The XmlElement class
def __init__(capsule)class DocumentAction()The DocumentAction class
def __init__(capsule)class ContentAction(DocumentAction)The ContentAction class
def __init__(capsule)class DocumentPartAction(DocumentAction)The DocumentPartAction class
def __init__(capsule)class FileAction(DocumentPartAction)The FileAction class
def __init__(capsule)class ContentFileAction(FileAction)The ContentFileAction class
def __init__(capsule)class ContentFilenameAction(FileAction)The ContentFilenameAction class
def __init__(capsule)class ExternalFileLocator()The ExternalFileLocator class
The class is constructed from two arguments,
the first is a string which may be one of
'JavaClassName',
'ServiceKey' or
'JavaClassName#ServiceKey',
and a dictionary of location properties.
class ExternalFileAction(FileAction)The ExternalFileAction class
def __init__(capsule)class DocMod(dict)The DocMod class - an alternative to using DocumentModifier with methods in FlowFileDocument by defining the implementation in a derived class.
Example:
All the action methods below may optionally be specified. Additional state may be stored as attributes in the base dictionary.
class MyDocMod(DocMod):
def preAction(self, action: DocumentAction):
self.mystate = "something"
def onContent(self, action: ContentAction): pass
def defaultFileAction(self, action: FileAction): pass
def onContentFile(self, action: ContentFileAction): pass
def onContentFilename(self, action: ContentFilenameAction): pass
def onExternalFile(self, action: ExternalFileAction): pass
def preAction(self, action: DocumentAction):
pass
Usage:
doc.modify(MyDocMod({"handlerstate"- "something"}))
class DocumentModifier()The DocumentModifier class
This class provides access through a set of callbacks to the parts and metadata of a FlowFileDocument.
The class can be used in two ways to modify a document:
-
Using callbacks
def myOnContentFilenameFunction(action: ContentFilenameAction): pass document.modify(DocumentModifier() .onContentFilename(myOnContentFilenameFunction)
-
Using a class
class MyDocMod(DocMod): def init(self): pass def onContentFilename(self, action: ContentFilenameAction): pass document.modify(MyDocMod())
class FlowFileDocument()The FlowFileDocument class
def __init__(capsule)A consolidated reference of all .py samples in ExecuteDocumentPythonSamples, organized by folder. Each section includes the full source.
"""{
"name": "Basic handler function",
"info": "Basic example of a handler function with ProcessContext, ProcessSession and FlowFile arguments"
}"""
from idolnifi import *
def handler(context, session, flowfile):
#Log processor information (print can also be used to log at INFO level)
logInfo(context.getName())
#Create a new flowfile and set an attribute
newflowfile = session.create()
newflowfile = session.putAttribute(newflowfile, "an_attribute", "value")
#Manipulate the new empty flowfile as a FlowFileDocument
doc = session.asFlowFileDocument(newflowfile)
doc.overwrite(lambda a : a.addContent('some text content'))
# Transfer created flowfile explicitly
session.transfer(newflowfile, "success")
# FlowFiles that are not transferred will be routed based on configured properties
# Return the input flowfile
return flowfile"""{
"name": "Install required packages",
"info": "Install packages required by the script when the processor is scheduled to run"
}"""
from idolnifi import executePython
required_packages = ['pip', 'numpy']
#Executed when the processor is scheduled to run
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)
import numpy
def handler():
#Do something using the packages
pass"""{
"name": "Using a DocumentModifier to modify a FlowFileDocument",
"info": "This example shows the basic usage of the DocumentModifier with callback methods"
}"""
from idolnifi import DocumentModifier, DocumentAction, ContentAction, FileAction
#Actions to perform first
def myPreAction(a: DocumentAction):
pass
#Actions to perform for each text content page in the FlowFileDocument
def myContentAction(a: ContentAction):
pass
#This is the default action called for all file part types in the FlowFileDocument
#The behaviour can be overridden for each by assigning the other methods on the DocumentModifier
#Each method takes an appropriate action argument type based on the method name
def myDefaultFileAction(a: FileAction):
pass
#Actions to perform last
def myPostAction(a: DocumentAction):
pass
def handler(doc):
doc.modify(DocumentModifier()
.preAction(myPreAction)
.onContent(myContentAction)
.defaultFileAction(myDefaultFileAction)
.postAction(myPostAction))"""{
"name": "Using only a postAction with a FlowFileDocument",
"info": "The postAction method on DocumentModifier provides general operations on a FlowFileDocument like access to the attributes and metadata, and adding new content or file parts to the FlowFileDocument. When only these operations are required, a single handler can be passed to the modify method, which accepts a DocumentAction parameter."
}"""
from idolnifi import DocumentModifier, DocumentAction
def myDocumentActionHandler(a: DocumentAction):
#Use DocumentAction to read metadata and write files and metadata to the FlowFileDocument
pass
def handler(doc):
#Direct call to myDocumentActionHandler as a postAction
doc.modify(myDocumentActionHandler)
#Using a DocumentModifier explicitly with a postAction
#This can be combined with additional DocumentModifier methods for access to existing content
doc.modify(DocumentModifier().postAction(myDocumentActionHandler))"""{
"name": "Using a DocMod with an init method",
"info": "The DocMod class can be used to implement the DocumentModifier actions as a class with optional additional state. This example uses the __init__ method to initialize the state from a parameter."
}"""
from idolnifi import DocMod, DocumentAction
class MyDocModWithInit(DocMod):
#Initialize any state for the class instance with an explicit initializer
def __init__(self, state = None):
super().__init__()
self.handlerState = state
#Pre action to perform
def preAction(self, a: DocumentAction):
#Assign additional state resulting from actions
self.state = "statedata"
#Other similarly named methods to those in DocumentModifier can be defined
#Post action to perform
def postAction(self, a: DocumentAction):
#use self.handlerState
#use self.state
pass
def handler(doc):
#Use a DocMod class instance to store temporary state
doc.modify(MyDocModWithInit("handlerstatedata"))"""{
"name": "Using a DocMod without an init method",
"info": "The DocMod class can be used to implement the DocumentModifier actions as a class with optional additional state. This example passes a dict to initialize the state via the super class dict initializer."
}"""
from idolnifi import DocMod, DocumentAction
class MyDocModWithoutInit(DocMod):
#Additional state is initialized via the super class dict initializer
#Pre action to perform
def preAction(self, a: DocumentAction):
#Assign additional state resulting from actions
self.state = "statedata"
#Other similarly named methods to those in DocumentModifier can be defined
#Post action to perform
def postAction(self, a: DocumentAction):
#use self.handlerState
#use self.state
pass
def handler(doc):
#Use a DocMod class instance with dictionary state
doc.modify(MyDocModWithoutInit({"handlerState": "handlerstatedata"}))"""{
"name": "Add document text content",
"info": "Add text content to a FlowFileDocument"
}"""
def addTextContentFromOutputStream(o):
o.write("text content written via an output stream");
def addTextContent(a):
#add content from string
a.addContent("text content from a string")
#add content from a text file
with open("mycontent.txt", "r") as textContentFromInputStream:
a.addContent(textContentFromInputStream)
#add content via a callback taking an output stream for writing
a.addContent(addTextContentFromOutputStream)
def handler(doc):
doc.modify(addTextContent)"""{
"name": "Retrieve document metadata",
"info": "Retrieve XML metadata values from a FlowFileDocument"
}"""
def retrieveMetadata(a):
xml = a.getXmlMetadata()
#Retrieve a single existing value
value = xml.getFirstChild("EXISTING_FIELD").getValue()
#Retrieve a list of values for elements matching an XPath
elements = xml.getElementsByXPath("FIELD1/FIELD2")
values = map(lambda e: e.getValue(), elements)
#Retrieve values directly by XPath
attrValues = xml.getValuesByXPath("FIELD1/FIELD2/@ATTR")
def handler(doc):
doc.read(retrieveMetadata)"""{
"name": "Update document metadata",
"info": "Update the XML metadata from a FlowFileDocument"
}"""
def updateMetadata(a):
xml = a.getXmlMetadata()
#Add a new field with a value
xml.addChild("NEW_FIELD").setValue("NEW_VALUE")
#Replace the first existing field or add a new field
xml.getOrAddChild("MAYBE_EXISTING_FIELD").setValue("NEW_VALUE")
def handler(doc):
doc.updateAttributes(updateMetadata)"""{
"name": "Render an image",
"info": "Render an image from calculated image data and add it to a FlowFileDocument"
}"""
from idolnifi import executePython
required_packages = ['pip', 'matplotlib', 'numpy']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)
import numpy as np
import matplotlib.pyplot as plt
def mandelbrot(h, w, maxit=20, r=2):
x = np.linspace(-2.5, 1.5, 4*h+1)
y = np.linspace(-1.5, 1.5, 3*w+1)
A, B = np.meshgrid(x, y)
C = A + B*1j
z = np.zeros_like(C)
divtime = maxit + np.zeros(z.shape, dtype=int)
for i in range(maxit):
z = z**2 + C
diverge = abs(z) > r
div_now = diverge & (divtime == maxit)
divtime[div_now] = i
z[diverge] = r
return divtime
def addImage(out):
plt.clf()
plt.imsave(out, mandelbrot(720, 720, 50))
def handler(doc):
doc.modify(lambda a: a.addFile(addImage, 'mandelbrot.png', 'image/png'))"""{
"name": "Detect edges from images in a FlowFileDocument",
"info": "Detect the edges in images attached to a FlowFileDocument and add new images showing the edges"
}"""
from idolnifi import DocumentModifier, ReadMode, executePython
required_packages = ['pip', 'matplotlib', 'numpy', 'opencv-python']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)
import numpy as np
import cv2 as cv
from matplotlib import pyplot as plt
def processFile(a):
edges = None
def buildEdges(imgIn):
nonlocal edges
filebytes = np.asarray(bytearray(imgIn.readall()), dtype=np.uint8)
edges = cv.Canny(cv.imdecode(filebytes, cv.IMREAD_GRAYSCALE), 100, 200)
a.readFile(buildEdges, ReadMode.READ_AND_KEEP)
def saveImage(imgOut):
plt.clf()
plt.imsave(imgOut, edges, cmap='gray')
a.addFile(saveImage, 'edges.png', 'image/png')
def handler(doc):
doc.modify(DocumentModifier()
.defaultFileAction(processFile))"""{
"name": "Create PDF from images",
"info": "Create a PDF containing a page for each image file from a FlowFile"
}"""
from idolnifi import executePython, DocumentModifier, getTempFile
required_packages = ['pip', 'pillow', 'fpdf2']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)
from PIL import Image
from fpdf import FPDF
from functools import partial
from io import BytesIO
def addToPdf(pdf, imgIn):
with Image.open(BytesIO(imgIn.readall())) as image:
pdf.add_page(format=image.size)
pdf.image(image, 0, 0, image.size[0])
def processFile(pdf, a):
a.readFile(partial(addToPdf, pdf))
a.deletePart()
def addPdfPart(pdf, a):
pdffile = getTempFile(extn=".pdf")
pdf.output(pdffile)
a.addFilename(pdffile, True, "application/pdf")
def handler(doc):
pdf = FPDF(unit="pt")
pdf.set_margin(0)
pdf.set_text_color(255)
doc.modify(DocumentModifier()
.defaultFileAction(partial(processFile, pdf))
.postAction(partial(addPdfPart, pdf)))"""{
"name": "Highlight regions in an image",
"info": "Highlight regions in images using rectangle data specified in the file part metadata"
}"""
from idolnifi import executePython, DocumentModifier
required_packages = ['pip', 'pillow']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)
from PIL import Image, ImageDraw
from functools import partial
from io import BytesIO
def updateImage(regions, imgIn, imgOut):
with (
Image.open(BytesIO(imgIn.readall())).convert("RGBA") as image,
Image.new('RGBA', image.size) as rectsImage
):
draw = ImageDraw.Draw(rectsImage)
for region in regions:
rect = list(map(int, region.split(",")))
draw.rectangle(rect, "#ffff004b", "#ffff004b", 3)
Image.alpha_composite(image, rectsImage).save(imgOut, "PNG")
def processFile(a):
#region field format "left,top,right,bottom" (e.g. "100,100,200,200" square at (100,100))
regions = a.getPartXmlMetadata().getValuesByXPath(".//regions/region")
a.transformFile(partial(updateImage, regions))
def handler(doc):
doc.modify(DocumentModifier()
.defaultFileAction(processFile))