Skip to content

Instantly share code, notes, and snippets.

@vbalashi
Created September 24, 2025 09:34
Show Gist options
  • Select an option

  • Save vbalashi/41ecf5d7e9b417d3e831911af82f0e0e to your computer and use it in GitHub Desktop.

Select an option

Save vbalashi/41ecf5d7e9b417d3e831911af82f0e0e to your computer and use it in GitHub Desktop.

generate_docs

idolnifi

The idolnifi module

LogStream Objects

class LogStream(_io.IOBase)

The LogStream class

__init__

def __init__(logFn, sep=' ', end='\n')

executePython

def executePython(args,
                  env=None,
                  stdout=stdoutLog,
                  stderr=stderrLog,
                  onexit=None)

Execute Python as a sub-process (Note: the idolnifi module is unavailable to sub-processes)

Arguments:

  • args list - Python command arguments
  • env dict - Additional environment variables
  • stdout IOBase - stdout stream
  • stderr IOBase - stderr stream
  • onexit - function to call on process exit with result code, or wait for exit if None

Returns:

Result code if onexit is not callable

executePythonToLines

def executePythonToLines(args, outputLines)

Execute Python as a sub-process, getting the command output to a list of lines (Note: the idolnifi module is unavailable to sub-processes)

Arguments:

  • args list - Python command arguments
  • outputLines list - List to append stdout/stderr lines to

Returns:

Result code

getStorageDirectory

def getStorageDirectory()

Get the path to the idol_repository storage directory for the processor

getFilesStorageDirectory

def getFilesStorageDirectory()

Get the path to the idol_repository files storage directory for the processor

getTempFile

def getTempFile(prefix='', extn='.tmp')

Get a temporary file name with the given filename prefix and extension

Arguments:

  • prefix str - Filename prefix string
  • extn str - File extension (default '.tmp')

Returns:

Temporary file name

getPropertyDescriptor

def getPropertyDescriptor(name)

Get the named PropertyDescriptor

Arguments:

  • name str - Property name

Returns:

PropertyDescriptor for the named property

isScheduled

def isScheduled()

Is the processor scheduled

Returns:

true if the processor is scheduled, false otherwise

readIOBaseToOutputStream

def readIOBaseToOutputStream(readable: _io.IOBase, outputstream)

Transfer from a readable io.IOBase to an OutputStream

Arguments:

  • outputstream OutputStream - Output stream

StateMap Objects

class StateMap()

The StateMap class

__init__

def __init__(capsule)

get

def get(key)

Returns the value associated with the given key

Arguments:

  • key str - State lookup key

Returns:

Value associated with the given key

getVersion

def getVersion()

Returns the version of the state, incremented each time the state is updated

Returns:

Version associated with the state

toMap

def toMap()

Returns a non-modifiable dictionary of all state keys and value

Returns:

Dictionary of state keys and values

StateManager Objects

class StateManager()

The StateManager class

__init__

def __init__(capsule)

clear

def clear(scope)

Clears all keys and values from the state

Arguments:

  • scope Scope - The scope to use

getState

def getState(scope)

Returns the current state for the component

Arguments:

  • scope Scope - The scope to use

Returns:

The StateMap

replace

def replace(oldState, newState, scope)

Updates the value of the state to the new value if and only if the value currently is the same as the given oldValue

Arguments:

  • oldState StateMap - the old value to compare against
  • newState dict - The value to change the state to if the state matches the oldState
  • scope Scope - The scope to use

Returns:

true if the state was updated to the new value, otherwise false

setState

def setState(state, scope)

Updates the value of the component state

Arguments:

  • state dict - The value to change the state to
  • scope Scope - The scope to use

PropertyDescriptor Objects

class PropertyDescriptor()

The PropertyDescriptor class

__init__

def __init__(capsule)

getName

def getName()

Get the name of the property

Returns:

The property name

isDynamic

def isDynamic()

Is the property dynamic

Returns:

true if the property is dynamic, false otherwise

PropertyValue Objects

class PropertyValue()

The PropertyValue class

__init__

def __init__(capsule)

getValue

def getValue()

Get the value as a string

Returns:

The value as a string

evaluateAttributeExpressions

def evaluateAttributeExpressions(flowfile=None)

Evaluate attribute expressions

Arguments:

  • flowfile FlowFile - FlowFile to use in expression evaluation

Returns:

The resulting PropertyValue

ProcessContext Objects

class ProcessContext()

The ProcessContext class

__init__

def __init__(capsule)

getName

def getName()

Get the configured name of the processor

Returns:

The processor name

getProperties

def getProperties()

Get all the properties for the processor

Returns:

Dictionary from PropertyDescriptor to value string

getProperty

def getProperty(propertydescriptor)

Get the current value for the property

Arguments:

  • propertydescriptor PropertyDescriptor | str - Property descriptor or name

Returns:

PropertyValue containing the value of the property

newPropertyValue

def newPropertyValue(value)

Get a PropertyValue from a value string

Arguments:

  • value str - Value string

Returns:

PropertyValue for the value string

getAnnotationData

def getAnnotationData()

Get the annotation data configured for this processor

Returns:

The processor annotation data

getMaxConcurrentTasks

def getMaxConcurrentTasks()

Get the maximum number of threads that this processor may be executing on at any given time

Returns:

The maximum number of processing threads

isConnectedToCluster

def isConnectedToCluster()

Get the current state of the cluster connection of this node

Returns:

true if connected to a cluster, false otherwise

getStateManager

def getStateManager()

Get the StateManager that can be used to store and retrieve state for this component

Returns:

The StateManager

hasIncomingConnection

def hasIncomingConnection()

Returns whether the processor has incoming connections

Returns:

true if the processor has one or more incoming connections, false otherwise

hasConnection

def hasConnection(relationship)

Returns whether the processor has outbound connections for a relationship name

Arguments:

  • relationship str - The relationship name

Returns:

true if the relationship has one or more outbound connections, false otherwise

yieldProcessor

def yieldProcessor()

Causes the Processor not to be scheduled for some pre-configured amount of time

relationshipAvailable

def relationshipAvailable(relationship)

Returns whether the processor has available capacity on an outbound connections

Arguments:

  • relationship str - The relationship name

Returns:

true if the relationship has available capacity, false otherwise

InputStream Objects

class InputStream(_io.RawIOBase)

The InputStream class

__init__

def __init__(capsule)

OutputStream Objects

class OutputStream(_io.RawIOBase)

The OutputStream class

__init__

def __init__(capsule)

FlowFile Objects

class FlowFile()

The FlowFile class

__init__

def __init__(capsule)

getUuid

def getUuid()

Get the unique identifier for this FlowFile

Returns:

The FlowFile UUID

getAttribute

def getAttribute(name)

Obtains the attribute value for the given key

Arguments:

  • name str - Attribute name

Returns:

The attribute value

getAttributes

def getAttributes()

Get a non-modifiable dictionary of all of the FlowFile attributes

Returns:

The dictionary of attributes

getSize

def getSize()

Get the size of the FlowFile

Returns:

Size of the FlowFile contents in bytes

ProcessSession Objects

class ProcessSession()

The ProcessSession class

__init__

def __init__(capsule)

putAttribute

def putAttribute(flowfile, name, value)

Adds an attribute to the FlowFile

Arguments:

  • flowfile FlowFile - The FlowFile
  • name str - Attribute name
  • value str - Attribute value

Returns:

The updated FlowFile

putAttributes

def putAttributes(flowfile, attributes)

Adds attributes to the FlowFile from the given dictionary

Arguments:

  • flowfile FlowFile - The FlowFile
  • attributes dict - Dictionary of attributes

Returns:

The updated FlowFile

removeAttribute

def removeAttribute(flowfile, name)

Remove an attribute from a FlowFile

Arguments:

  • flowfile FlowFile - The FlowFile
  • name str - Attribute name

Returns:

The updated FlowFile

removeAttributes

def removeAttributes(flowfile, names=[], *, pattern=None)

Remove attributes from a FlowFile by names or by matching a pattern

Arguments:

  • flowfile FlowFile - The FlowFile
  • names list - List of attribute names
  • pattern str - Pattern string to match multiple attribute names

Returns:

The updated FlowFile

read

def read(flowfile, reader: _t.Callable[[InputStream], None])

Executes the given callback against the contents corresponding to the given FlowFile

Arguments:

  • flowfile FlowFile - The FlowFile
  • reader Callable - Callable accepting an InputStream

remove

def remove(flowfile)

Remove the FlowFile from the session

Arguments:

  • flowfile FlowFile - The FlowFile

transfer

def transfer(flowfile, relationship=None)

Transfer the FlowFile to the named output relationship or back to the input queue

Arguments:

  • flowfile FlowFile - The FlowFile
  • relationship str - The output relationship name

write

def write(flowfile, writer: _t.Callable[[OutputStream], None])

Executes the given callback against the content corresponding to the given FlowFile

Arguments:

  • flowfile FlowFile - The FlowFile
  • writer Callable - Callable accepting an OutputStream

readWrite

def readWrite(flowfile, readerWriter: _t.Callable[[InputStream, OutputStream],
                                                  None])

Executes the given callback against the content corresponding to the given FlowFile

Arguments:

  • flowfile FlowFile - The FlowFile
  • readerWriter Callable - Callable accepting an InputStream and an OutputStream

penalize

def penalize(flowfile)

Penalize the FlowFile for the penalty duration

Arguments:

  • flowfile FlowFile - The FlowFile

Returns:

The penalized FlowFile

get

def get(*, maxResults=None)

Returns one FlowFile or a list of up to maxResults FlowFiles from the input queue

Arguments:

  • maxResults int - If specified, the maximum number of FlowFiles to return as a list

Returns:

One FlowFile, a list of FlowFiles, or None

getQueueSize

def getQueueSize()

Returns the number of FlowFiles in the input queue

Returns:

Number of FlowFiles

commit

def commit()

Commits the current session ensuring all operations against FlowFiles within this session are atomically persisted

rollback

def rollback()

Reverts any changes made during this session

create

def create(parent=None)

Creates a new FlowFile, optionally with parent linkage

Arguments:

  • parent FlowFile - Optional parent FlowFile

Returns:

The created FlowFile

asFlowFileDocument

def asFlowFileDocument(flowfile)

Get a FlowFileDocument representation of the FlowFile for reading and writing IDOL document format

Arguments:

  • flowfile FlowFile - The source FlowFile

Returns:

The FlowFileDocument for the FlowFile

XmlElement Objects

class XmlElement()

The XmlElement class

__init__

def __init__(capsule)

DocumentAction Objects

class DocumentAction()

The DocumentAction class

__init__

def __init__(capsule)

ContentAction Objects

class ContentAction(DocumentAction)

The ContentAction class

__init__

def __init__(capsule)

DocumentPartAction Objects

class DocumentPartAction(DocumentAction)

The DocumentPartAction class

__init__

def __init__(capsule)

FileAction Objects

class FileAction(DocumentPartAction)

The FileAction class

__init__

def __init__(capsule)

ContentFileAction Objects

class ContentFileAction(FileAction)

The ContentFileAction class

__init__

def __init__(capsule)

ContentFilenameAction Objects

class ContentFilenameAction(FileAction)

The ContentFilenameAction class

__init__

def __init__(capsule)

ExternalFileLocator Objects

class ExternalFileLocator()

The ExternalFileLocator class

The class is constructed from two arguments, the first is a string which may be one of 'JavaClassName', 'ServiceKey' or 'JavaClassName#ServiceKey', and a dictionary of location properties.

ExternalFileAction Objects

class ExternalFileAction(FileAction)

The ExternalFileAction class

__init__

def __init__(capsule)

DocMod Objects

class DocMod(dict)

The DocMod class - an alternative to using DocumentModifier with methods in FlowFileDocument by defining the implementation in a derived class.

Example:

All the action methods below may optionally be specified. Additional state may be stored as attributes in the base dictionary.

class MyDocMod(DocMod): def preAction(self, action: DocumentAction): self.mystate = "something" def onContent(self, action: ContentAction): pass def defaultFileAction(self, action: FileAction): pass def onContentFile(self, action: ContentFileAction): pass def onContentFilename(self, action: ContentFilenameAction): pass def onExternalFile(self, action: ExternalFileAction): pass def preAction(self, action: DocumentAction):

Do something maybe using self.mystate or self.handlerstate

pass

Usage:

  • doc.modify(MyDocMod({"handlerstate" - "something"}))

DocumentModifier Objects

class DocumentModifier()

The DocumentModifier class

This class provides access through a set of callbacks to the parts and metadata of a FlowFileDocument.

The class can be used in two ways to modify a document:

  • Using callbacks

    def myOnContentFilenameFunction(action: ContentFilenameAction): pass document.modify(DocumentModifier() .onContentFilename(myOnContentFilenameFunction)

  • Using a class

    class MyDocMod(DocMod): def init(self): pass def onContentFilename(self, action: ContentFilenameAction): pass document.modify(MyDocMod())

FlowFileDocument Objects

class FlowFileDocument()

The FlowFileDocument class

__init__

def __init__(capsule)

ExecuteDocumentPythonSamples — All Python Files

A consolidated reference of all .py samples in ExecuteDocumentPythonSamples, organized by folder. Each section includes the full source.

Root

_handler_1.py

"""{
    "name": "Basic handler function",
    "info": "Basic example of a handler function with ProcessContext, ProcessSession and FlowFile arguments"
}"""

from idolnifi import *

def handler(context, session, flowfile):

    #Log processor information (print can also be used to log at INFO level)
    logInfo(context.getName())

    #Create a new flowfile and set an attribute
    newflowfile = session.create()
    newflowfile = session.putAttribute(newflowfile, "an_attribute", "value")
    
    #Manipulate the new empty flowfile as a FlowFileDocument
    doc = session.asFlowFileDocument(newflowfile)
    doc.overwrite(lambda a : a.addContent('some text content'))

    # Transfer created flowfile explicitly
    session.transfer(newflowfile, "success")
    
    # FlowFiles that are not transferred will be routed based on configured properties
    
    # Return the input flowfile
    return flowfile

_install_required_packages.py

"""{
    "name": "Install required packages",
    "info": "Install packages required by the script when the processor is scheduled to run"
}"""

from idolnifi import executePython

required_packages = ['pip', 'numpy']

#Executed when the processor is scheduled to run
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)

import numpy

def handler():
    #Do something using the packages
    pass

classes_documentmodifier

using_modify_0.py

"""{
    "name": "Using a DocumentModifier to modify a FlowFileDocument",
    "info": "This example shows the basic usage of the DocumentModifier with callback methods"
}"""

from idolnifi import DocumentModifier, DocumentAction, ContentAction, FileAction

#Actions to perform first
def myPreAction(a: DocumentAction):
    pass

#Actions to perform for each text content page in the FlowFileDocument
def myContentAction(a: ContentAction):
    pass

#This is the default action called for all file part types in the FlowFileDocument
#The behaviour can be overridden for each by assigning the other methods on the DocumentModifier
#Each method takes an appropriate action argument type based on the method name
def myDefaultFileAction(a: FileAction):
    pass

#Actions to perform last
def myPostAction(a: DocumentAction):
    pass

def handler(doc):
    doc.modify(DocumentModifier()
        .preAction(myPreAction)
        .onContent(myContentAction)
        .defaultFileAction(myDefaultFileAction)
        .postAction(myPostAction))

using_modify_1.py

"""{
    "name": "Using only a postAction with a FlowFileDocument",
    "info": "The postAction method on DocumentModifier provides general operations on a FlowFileDocument like access to the attributes and metadata, and adding new content or file parts to the FlowFileDocument.  When only these operations are required, a single handler can be passed to the modify method, which accepts a DocumentAction parameter."
}"""

from idolnifi import DocumentModifier, DocumentAction

def myDocumentActionHandler(a: DocumentAction):
    #Use DocumentAction to read metadata and write files and metadata to the FlowFileDocument
    pass

def handler(doc):
    #Direct call to myDocumentActionHandler as a postAction
    doc.modify(myDocumentActionHandler)

    #Using a DocumentModifier explicitly with a postAction
    #This can be combined with additional DocumentModifier methods for access to existing content
    doc.modify(DocumentModifier().postAction(myDocumentActionHandler))

using_modify_2.py

"""{
    "name": "Using a DocMod with an init method",
    "info": "The DocMod class can be used to implement the DocumentModifier actions as a class with optional additional state.  This example uses the __init__ method to initialize the state from a parameter."
}"""

from idolnifi import DocMod, DocumentAction

class MyDocModWithInit(DocMod):
    #Initialize any state for the class instance with an explicit initializer
    def __init__(self, state = None):
        super().__init__()
        self.handlerState = state
    #Pre action to perform
    def preAction(self, a: DocumentAction):
        #Assign additional state resulting from actions
        self.state = "statedata"
    #Other similarly named methods to those in DocumentModifier can be defined
    #Post action to perform
    def postAction(self, a: DocumentAction):
        #use self.handlerState
        #use self.state
        pass

def handler(doc):
    #Use a DocMod class instance to store temporary state 
    doc.modify(MyDocModWithInit("handlerstatedata"))

using_modify_3.py

"""{
    "name": "Using a DocMod without an init method",
    "info": "The DocMod class can be used to implement the DocumentModifier actions as a class with optional additional state.  This example passes a dict to initialize the state via the super class dict initializer."
}"""

from idolnifi import DocMod, DocumentAction

class MyDocModWithoutInit(DocMod):
    #Additional state is initialized via the super class dict initializer
    #Pre action to perform
    def preAction(self, a: DocumentAction):
        #Assign additional state resulting from actions
        self.state = "statedata"
    #Other similarly named methods to those in DocumentModifier can be defined
    #Post action to perform
    def postAction(self, a: DocumentAction):
        #use self.handlerState
        #use self.state
        pass

def handler(doc):
    #Use a DocMod class instance with dictionary state
    doc.modify(MyDocModWithoutInit({"handlerState": "handlerstatedata"}))

classes_flowfiledocument

add_text_content.py

"""{
    "name": "Add document text content",
    "info": "Add text content to a FlowFileDocument"
}"""

def addTextContentFromOutputStream(o):
    o.write("text content written via an output stream");

def addTextContent(a):
    #add content from string
    a.addContent("text content from a string")

    #add content from a text file
    with open("mycontent.txt", "r") as textContentFromInputStream:
        a.addContent(textContentFromInputStream)

    #add content via a callback taking an output stream for writing
    a.addContent(addTextContentFromOutputStream)
    
def handler(doc):
    doc.modify(addTextContent)

retrieve_metadata.py

"""{
    "name": "Retrieve document metadata",
    "info": "Retrieve XML metadata values from a FlowFileDocument"
}"""

def retrieveMetadata(a):
    xml = a.getXmlMetadata()

    #Retrieve a single existing value
    value = xml.getFirstChild("EXISTING_FIELD").getValue()

    #Retrieve a list of values for elements matching an XPath
    elements = xml.getElementsByXPath("FIELD1/FIELD2")
    values = map(lambda e: e.getValue(), elements)

    #Retrieve values directly by XPath
    attrValues = xml.getValuesByXPath("FIELD1/FIELD2/@ATTR")
    
def handler(doc):
    doc.read(retrieveMetadata)

update_metadata.py

"""{
    "name": "Update document metadata",
    "info": "Update the XML metadata from a FlowFileDocument"
}"""

def updateMetadata(a):
    xml = a.getXmlMetadata()

    #Add a new field with a value
    xml.addChild("NEW_FIELD").setValue("NEW_VALUE")

    #Replace the first existing field or add a new field
    xml.getOrAddChild("MAYBE_EXISTING_FIELD").setValue("NEW_VALUE")

def handler(doc):
    doc.updateAttributes(updateMetadata)

using_python_packages

render_image.py

"""{
    "name": "Render an image",
    "info": "Render an image from calculated image data and add it to a FlowFileDocument"
}"""

from idolnifi import executePython

required_packages = ['pip', 'matplotlib', 'numpy']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)

import numpy as np
import matplotlib.pyplot as plt

def mandelbrot(h, w, maxit=20, r=2):
    x = np.linspace(-2.5, 1.5, 4*h+1)
    y = np.linspace(-1.5, 1.5, 3*w+1)
    A, B = np.meshgrid(x, y)
    C = A + B*1j
    z = np.zeros_like(C)
    divtime = maxit + np.zeros(z.shape, dtype=int)
    for i in range(maxit):
        z = z**2 + C
        diverge = abs(z) > r
        div_now = diverge & (divtime == maxit)
        divtime[div_now] = i
        z[diverge] = r
    return divtime

def addImage(out):
    plt.clf()
    plt.imsave(out, mandelbrot(720, 720, 50))

def handler(doc):
    doc.modify(lambda a: a.addFile(addImage, 'mandelbrot.png', 'image/png'))

render_image_from_edges.py

"""{
    "name": "Detect edges from images in a FlowFileDocument",
    "info": "Detect the edges in images attached to a FlowFileDocument and add new images showing the edges"
}"""

from idolnifi import DocumentModifier, ReadMode, executePython

required_packages = ['pip', 'matplotlib', 'numpy', 'opencv-python']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)

import numpy as np
import cv2 as cv
from matplotlib import pyplot as plt

def processFile(a):
    edges = None
    def buildEdges(imgIn):
        nonlocal edges
        filebytes = np.asarray(bytearray(imgIn.readall()), dtype=np.uint8)
        edges = cv.Canny(cv.imdecode(filebytes, cv.IMREAD_GRAYSCALE), 100, 200)
    a.readFile(buildEdges, ReadMode.READ_AND_KEEP)
    def saveImage(imgOut):
        plt.clf()
        plt.imsave(imgOut, edges, cmap='gray')
    a.addFile(saveImage, 'edges.png', 'image/png')
    
def handler(doc):
    doc.modify(DocumentModifier()
        .defaultFileAction(processFile))

create_pdf_from_images.py

"""{
    "name": "Create PDF from images",
    "info": "Create a PDF containing a page for each image file from a FlowFile"
}"""

from idolnifi import executePython, DocumentModifier, getTempFile

required_packages = ['pip', 'pillow', 'fpdf2']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)

from PIL import Image
from fpdf import FPDF
from functools import partial
from io import BytesIO

def addToPdf(pdf, imgIn):
    with Image.open(BytesIO(imgIn.readall())) as image:
        pdf.add_page(format=image.size)
        pdf.image(image, 0, 0, image.size[0])

def processFile(pdf, a):
    a.readFile(partial(addToPdf, pdf))
    a.deletePart()
    
def addPdfPart(pdf, a):
    pdffile = getTempFile(extn=".pdf")
    pdf.output(pdffile)
    a.addFilename(pdffile, True, "application/pdf")

def handler(doc):
    pdf = FPDF(unit="pt")
    pdf.set_margin(0)
    pdf.set_text_color(255)
    doc.modify(DocumentModifier()
        .defaultFileAction(partial(processFile, pdf))
        .postAction(partial(addPdfPart, pdf)))

highlight_image_regions.py

"""{
    "name": "Highlight regions in an image",
    "info": "Highlight regions in images using rectangle data specified in the file part metadata"
}"""

from idolnifi import executePython, DocumentModifier

required_packages = ['pip', 'pillow']
executePython(['-m', 'pip', 'install', '-U', '-q'] + required_packages)

from PIL import Image, ImageDraw
from functools import partial
from io import BytesIO

def updateImage(regions, imgIn, imgOut):
    with (
        Image.open(BytesIO(imgIn.readall())).convert("RGBA") as image,
        Image.new('RGBA', image.size) as rectsImage
    ):
        draw = ImageDraw.Draw(rectsImage)
        for region in regions:
            rect = list(map(int, region.split(",")))
            draw.rectangle(rect, "#ffff004b", "#ffff004b", 3)
        Image.alpha_composite(image, rectsImage).save(imgOut, "PNG")

def processFile(a):
    #region field format "left,top,right,bottom" (e.g. "100,100,200,200" square at (100,100))
    regions = a.getPartXmlMetadata().getValuesByXPath(".//regions/region")
    a.transformFile(partial(updateImage, regions))

def handler(doc):
    doc.modify(DocumentModifier()
        .defaultFileAction(processFile))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment