graphify

package
v0.0.0-...-ec844c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Copyright 2025 Specter Ops, Inc.

Licensed under the Apache License, Version 2.0 you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

SPDX-License-Identifier: Apache-2.0

Copyright 2025 Specter Ops, Inc.

Licensed under the Apache License, Version 2.0 you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

SPDX-License-Identifier: Apache-2.0

Copyright 2025 Specter Ops, Inc.

Licensed under the Apache License, Version 2.0 you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

SPDX-License-Identifier: Apache-2.0

Index

Constants

View Source
const (
	SerialError                   = "error deserializing %s: %v"
	ExtractError                  = "failed to extract owner id/type from directory object: %v"
	PrincipalTypeServicePrincipal = "ServicePrincipal"
	PrincipalTypeUser             = "User"
)
View Source
const (
	IngestCountThreshold = 500
	ReconcileProperty    = "reconcile"
)

Variables

This section is empty.

Functions

func ConvertGenericEdge

func ConvertGenericEdge(entity ein.GenericEdge, converted *ConvertedData) error

func ConvertGenericNode

func ConvertGenericNode(entity ein.GenericNode, converted *ConvertedData) error

func CreateIngestDecoder

func CreateIngestDecoder(reader io.ReadSeeker, key string, targetDepth int) (*json.Decoder, error)

CreateIngestDecoder returns a JSON decoder that is positioned at the start of the array under the specified top-level key (e.g., "nodes", "edges", "data"). The returned decoder is ready to stream-decode each element of the array sequentially.

func DecodeGenericData

func DecodeGenericData[T any](batch *IngestContext, decoder *json.Decoder, sourceKind graph.Kind, conversionFunc ConversionFunc[T]) error

func IngestAzureData

func IngestAzureData(batch *IngestContext, converted ConvertedAzureData) error

func IngestBasicData

func IngestBasicData(batch *IngestContext, converted ConvertedData) error

func IngestDNRelationships

func IngestDNRelationships(batch *IngestContext, relationships []ein.IngestibleRelationship) error

func IngestGenericData

func IngestGenericData(batch *IngestContext, sourceKind graph.Kind, converted ConvertedData) error

IngestGenericData writes generic graph data into the database using the provided batch. It attempts to ingest all nodes and relationships from the ConvertedData object.

Because generic entities do not have a predefined base kind (unlike AZ or AD), this function passes graph.EmptyKind to the node and relationship ingestion functions. This indicates that no base kind should be applied uniformly to all ingested entities, and instead the kind(s) defined directly on each node or edge (if any) are used as-is.

func IngestGroupData

func IngestGroupData(batch *IngestContext, converted ConvertedGroupData) error

func IngestNode

func IngestNode(ic *IngestContext, baseKind graph.Kind, nextNode ein.IngestibleNode) error

func IngestNodes

func IngestNodes(ingestCtx *IngestContext, baseKind graph.Kind, nodes []ein.IngestibleNode) error

func IngestRelationships

func IngestRelationships(ingestCtx *IngestContext, sourceKind graph.Kind, relationships []ein.IngestibleRelationship) error

IngestRelationships resolves and writes a batch of ingestible relationships to the graph.

This function first calls resolveRelationships to resolve node identifiers based on name and kind.

Each resolved relationship update is applied to the graph via batch.UpdateRelationshipBy. Errors encountered during resolution or update are collected and returned as a single combined error.

func IngestSessions

func IngestSessions(batch *IngestContext, sessions []ein.IngestibleSession) error

func IngestWrapper

func IngestWrapper(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata, readOpts ReadOptions) error

IngestWrapper dispatches the ingest process based on the metadata's type.

func InitializeIngestMetrics

func InitializeIngestMetrics(registerer prometheus.Registerer) error

InitializeIngestMetrics registers the ingestion throughput gauge with the Prometheus registry

func MergeNodeKinds

func MergeNodeKinds(sourceKind graph.Kind, additionalKinds ...graph.Kind) []graph.Kind

MergeNodeKinds combines a source kind with any additional kinds, then removes any occurrences of graph.EmptyKind from the result. Ensures a clean, usable kind list for downstream logic.

func PublishIngestThroughput

func PublishIngestThroughput(nodesProcessed, relsProcessed, nodesWritten, relsWritten int64, duration time.Duration)

PublishIngestThroughput publishes ingestion throughput metrics to Prometheus

func ReadFileForIngest

func ReadFileForIngest(batch *IngestContext, reader io.ReadSeeker, options ReadOptions) error

ReadFileForIngest orchestrates the ingestion of a file into the graph database, performing any necessary metadata validation and schema enforcement before delegating to the core ingest logic.

If the file type is ZIP, additional validation is performed using JSON Schema, and the full stream is consumed to enable downstream readers to function correctly. Zip files are validated here and not at file upload time because it would be expensive to decompress the entire zip into memory. Files that fail this validation step will not be processed further.

Returns an error if metadata validation or ingestion fails.

func SeekToKey

func SeekToKey(decoder *json.Decoder, key string, targetDepth int) error

SeekToKey positions the JSON decoder at the value of the given key, which must appear at the specified object depth. If the key is "nodes" or "edges" or "data", the decoder advances past the key and opening '[' token, positioning at the first array element. For other keys (e.g., "metadata"), the decoder stops at the delimiter token itself (e.g., '{') so that callers can handle decoding.

Types

type AzureBase

type AzureBase struct {
	Kind enums.Kind      `json:"kind"`
	Data json.RawMessage `json:"data"`
}

type BatchUpdater

type BatchUpdater interface {
	UpdateNodeBy(update graph.NodeUpdate) error
	UpdateRelationshipBy(update graph.RelationshipUpdate) error
	Nodes() graph.NodeQuery
	Relationships() graph.RelationshipQuery
}

BatchUpdater represents the ingestion-facing API for a dawgs BatchOperation

func NewCountingBatchUpdater

func NewCountingBatchUpdater(inner BatchUpdater, stats *IngestStats) BatchUpdater

NewCountingBatchUpdater creates a BatchUpdater wrapper that tracks operation counts

type ChangeManager

type ChangeManager interface {
	ResolveChange(change changelog.Change) (bool, error)
	Submit(ctx context.Context, change changelog.Change) bool
	FlushStats()
	ClearCache(ctx context.Context)
}

ChangeManager represents the ingestion-facing API for the changelog daemon.

It provides three responsibilities:

  • Deduplication: ResolveChange determines whether a proposed change is new or modified and therefore requires persistence, or whether it has already been seen.
  • Submission: Submit enqueues a change for asynchronous processing by the changelog loop.
  • Metrics: FlushStats logs and resets internal cache hit/miss statistics, allowing callers to observe deduplication efficiency over time.

To generate mocks for this interface for unit testing seams in the application please use:

mockgen -source=ingest.go -destination=mocks/ingest.go -package=mocks

type ConversionFunc

type ConversionFunc[T any] func(decoded T, converted *ConvertedData) error

ConversionFunc is a function that transforms a decoded JSON object (of type T) into its corresponding internal ingest representation, appending it to the provided ConvertedData.

T represents a specific ingest type (e.g., User, Computer, Group, etc.).

type ConversionFuncWithTime

type ConversionFuncWithTime[T any] func(decoded T, converted *ConvertedData, ingestTime time.Time)

type ConvertedAzureData

type ConvertedAzureData struct {
	NodeProps   []ein.IngestibleNode
	RelProps    []ein.IngestibleRelationship
	OnPremNodes []ein.IngestibleNode
}

func (*ConvertedAzureData) Clear

func (s *ConvertedAzureData) Clear()

type ConvertedData

type ConvertedData struct {
	NodeProps []ein.IngestibleNode
	RelProps  []ein.IngestibleRelationship
}

func (*ConvertedData) Clear

func (s *ConvertedData) Clear()

type ConvertedGroupData

type ConvertedGroupData struct {
	NodeProps              []ein.IngestibleNode
	RelProps               []ein.IngestibleRelationship
	DistinguishedNameProps []ein.IngestibleRelationship
}

func (*ConvertedGroupData) Clear

func (s *ConvertedGroupData) Clear()

type ConvertedSessionData

type ConvertedSessionData struct {
	SessionProps []ein.IngestibleSession
}

func CreateConvertedSessionData

func CreateConvertedSessionData(count int) ConvertedSessionData

func (*ConvertedSessionData) Clear

func (s *ConvertedSessionData) Clear()

type GraphifyData

type GraphifyData interface {
	appcfg.ParameterService

	// Task handlers
	GetAllIngestTasks(ctx context.Context) (model.IngestTasks, error)
	DeleteIngestTask(ctx context.Context, ingestTask model.IngestTask) error
	GetFlagByKey(context.Context, string) (appcfg.FeatureFlag, error)

	RegisterSourceKind(context.Context) func(sourceKind graph.Kind) error
}

The GraphifyData interface is designed to manage the lifecycle of ingestion tasks

type GraphifyService

type GraphifyService struct {
	// contains filtered or unexported fields
}

func NewGraphifyService

func NewGraphifyService(ctx context.Context, db GraphifyData, graphDb graph.Database, cfg config.Configuration, schema upload.IngestSchema, changeManager ChangeManager) GraphifyService

func (*GraphifyService) NewIngestContext

func (s *GraphifyService) NewIngestContext(ctx context.Context, ingestTime time.Time, useChangelog bool) *IngestContext

func (*GraphifyService) ProcessIngestFile

func (s *GraphifyService) ProcessIngestFile(ic *IngestContext, task model.IngestTask) ([]IngestFileData, error)

ProcessIngestFile reads the files at the path supplied, and returns the total number of files in the archive, the number of files that failed to ingest as JSON, and an error

func (*GraphifyService) ProcessTasks

func (s *GraphifyService) ProcessTasks(updateJob UpdateJobFunc)

func (*GraphifyService) RegisterSourceKind

func (s *GraphifyService) RegisterSourceKind(ctx context.Context) func(kind graph.Kind) error

RegisterSourceKind - returns a function that will register a source kind and then refresh the in-memory DAWGS kind map

type IngestContext

type IngestContext struct {
	Ctx context.Context
	// Batch is the buffering/flushing mechanism that writes entities to the graph database
	Batch BatchUpdater
	// IngestTime is a single timestamp assigned to the lastseen property of every entity ignested per ingest run
	IngestTime time.Time
	// Manager is the caching layer that deduplicates ingest payloads across ingest runs
	Manager ChangeManager
	// Stats tracks the number of nodes and relationships processed during ingestion
	Stats *IngestStats

	// RetainIngestedFiles determines if the service should clean up working files after ingest
	RetainIngestedFiles bool
}

IngestContext is a container for dependencies needed by ingest

func NewIngestContext

func NewIngestContext(ctx context.Context, opts ...IngestOption) *IngestContext

func (*IngestContext) BindBatchUpdater

func (s *IngestContext) BindBatchUpdater(batch BatchUpdater)

func (*IngestContext) HasChangelog

func (s *IngestContext) HasChangelog() bool

type IngestFileData

type IngestFileData struct {
	Name         string
	ParentFile   string
	Path         string
	Errors       []string
	UserDataErrs []string
}

type IngestOption

type IngestOption func(*IngestContext)

option helpers

func WithBatchUpdater

func WithBatchUpdater(batchUpdater BatchUpdater) IngestOption

func WithChangeManager

func WithChangeManager(manager ChangeManager) IngestOption

func WithIngestRetentionConfig

func WithIngestRetentionConfig(shouldRetainIngestedFiles bool) IngestOption

func WithIngestTime

func WithIngestTime(ingestTime time.Time) IngestOption

type IngestStats

type IngestStats struct {
	// Total entities processed (including deduplicated ones).
	// Consider this the number of elements present in the raw ingest payload.
	NodesProcessed         atomic.Int64
	RelationshipsProcessed atomic.Int64

	// Entities actually written to database (subset of processed)
	NodesWritten         atomic.Int64
	RelationshipsWritten atomic.Int64
}

IngestStats tracks the number of nodes and relationships processed during ingestion

func (*IngestStats) GetCounts

func (s *IngestStats) GetCounts() (nodesProcessed, relsProcessed, nodesWritten, relsWritten int64)

func (*IngestStats) Reset

func (s *IngestStats) Reset()

type IngestUserDataError

type IngestUserDataError struct {
	Msg string
}

IngestUserDataError is used to return an error related to the data a user is ingesting, vs an error in the internal go logic

func (IngestUserDataError) Error

func (e IngestUserDataError) Error() string

type ReadOptions

type ReadOptions struct {
	FileType           model.FileType // JSON or ZIP
	IngestSchema       upload.IngestSchema
	RegisterSourceKind registrationFn
}

type UpdateJobFunc

type UpdateJobFunc func(jobId int64, fileData []IngestFileData)

UpdateJobFunc is passed to the graphify service to let it tell us about the tasks as they are processed

The datapipe doesn't know or care about tasks, and the graphify service doesn't know or care about jobs. Instead, this func is provided as an abstraction for graphify.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL