| package graphdb |
| |
| import ( |
| "database/sql" |
| "fmt" |
| "path" |
| "strings" |
| "sync" |
| ) |
| |
| const ( |
| createEntityTable = ` |
| CREATE TABLE IF NOT EXISTS entity ( |
| id text NOT NULL PRIMARY KEY |
| );` |
| |
| createEdgeTable = ` |
| CREATE TABLE IF NOT EXISTS edge ( |
| "entity_id" text NOT NULL, |
| "parent_id" text NULL, |
| "name" text NOT NULL, |
| CONSTRAINT "parent_fk" FOREIGN KEY ("parent_id") REFERENCES "entity" ("id"), |
| CONSTRAINT "entity_fk" FOREIGN KEY ("entity_id") REFERENCES "entity" ("id") |
| ); |
| ` |
| |
| createEdgeIndices = ` |
| CREATE UNIQUE INDEX IF NOT EXISTS "name_parent_ix" ON "edge" (parent_id, name); |
| ` |
| ) |
| |
| // Entity with a unique id. |
| type Entity struct { |
| id string |
| } |
| |
| // An Edge connects two entities together. |
| type Edge struct { |
| EntityID string |
| Name string |
| ParentID string |
| } |
| |
| // Entities stores the list of entities. |
| type Entities map[string]*Entity |
| |
| // Edges stores the relationships between entities. |
| type Edges []*Edge |
| |
| // WalkFunc is a function invoked to process an individual entity. |
| type WalkFunc func(fullPath string, entity *Entity) error |
| |
| // Database is a graph database for storing entities and their relationships. |
| type Database struct { |
| conn *sql.DB |
| mux sync.RWMutex |
| } |
| |
| // IsNonUniqueNameError processes the error to check if it's caused by |
| // a constraint violation. |
| // This is necessary because the error isn't the same across various |
| // sqlite versions. |
| func IsNonUniqueNameError(err error) bool { |
| str := err.Error() |
| // sqlite 3.7.17-1ubuntu1 returns: |
| // Set failure: Abort due to constraint violation: columns parent_id, name are not unique |
| if strings.HasSuffix(str, "name are not unique") { |
| return true |
| } |
| // sqlite-3.8.3-1.fc20 returns: |
| // Set failure: Abort due to constraint violation: UNIQUE constraint failed: edge.parent_id, edge.name |
| if strings.Contains(str, "UNIQUE constraint failed") && strings.Contains(str, "edge.name") { |
| return true |
| } |
| // sqlite-3.6.20-1.el6 returns: |
| // Set failure: Abort due to constraint violation: constraint failed |
| if strings.HasSuffix(str, "constraint failed") { |
| return true |
| } |
| return false |
| } |
| |
| // NewDatabase creates a new graph database initialized with a root entity. |
| func NewDatabase(conn *sql.DB) (*Database, error) { |
| if conn == nil { |
| return nil, fmt.Errorf("Database connection cannot be nil") |
| } |
| db := &Database{conn: conn} |
| |
| // Create root entities |
| tx, err := conn.Begin() |
| if err != nil { |
| return nil, err |
| } |
| |
| if _, err := tx.Exec(createEntityTable); err != nil { |
| return nil, err |
| } |
| if _, err := tx.Exec(createEdgeTable); err != nil { |
| return nil, err |
| } |
| if _, err := tx.Exec(createEdgeIndices); err != nil { |
| return nil, err |
| } |
| |
| if _, err := tx.Exec("DELETE FROM entity where id = ?", "0"); err != nil { |
| tx.Rollback() |
| return nil, err |
| } |
| |
| if _, err := tx.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil { |
| tx.Rollback() |
| return nil, err |
| } |
| |
| if _, err := tx.Exec("DELETE FROM edge where entity_id=? and name=?", "0", "/"); err != nil { |
| tx.Rollback() |
| return nil, err |
| } |
| |
| if _, err := tx.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil { |
| tx.Rollback() |
| return nil, err |
| } |
| |
| if err := tx.Commit(); err != nil { |
| return nil, err |
| } |
| |
| return db, nil |
| } |
| |
| // Close the underlying connection to the database. |
| func (db *Database) Close() error { |
| return db.conn.Close() |
| } |
| |
| // Set the entity id for a given path. |
| func (db *Database) Set(fullPath, id string) (*Entity, error) { |
| db.mux.Lock() |
| defer db.mux.Unlock() |
| |
| tx, err := db.conn.Begin() |
| if err != nil { |
| return nil, err |
| } |
| |
| var entityID string |
| if err := tx.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityID); err != nil { |
| if err == sql.ErrNoRows { |
| if _, err := tx.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil { |
| tx.Rollback() |
| return nil, err |
| } |
| } else { |
| tx.Rollback() |
| return nil, err |
| } |
| } |
| e := &Entity{id} |
| |
| parentPath, name := splitPath(fullPath) |
| if err := db.setEdge(parentPath, name, e, tx); err != nil { |
| tx.Rollback() |
| return nil, err |
| } |
| |
| if err := tx.Commit(); err != nil { |
| return nil, err |
| } |
| return e, nil |
| } |
| |
| // Exists returns true if a name already exists in the database. |
| func (db *Database) Exists(name string) bool { |
| db.mux.RLock() |
| defer db.mux.RUnlock() |
| |
| e, err := db.get(name) |
| if err != nil { |
| return false |
| } |
| return e != nil |
| } |
| |
| func (db *Database) setEdge(parentPath, name string, e *Entity, tx *sql.Tx) error { |
| parent, err := db.get(parentPath) |
| if err != nil { |
| return err |
| } |
| if parent.id == e.id { |
| return fmt.Errorf("Cannot set self as child") |
| } |
| |
| if _, err := tx.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // RootEntity returns the root "/" entity for the database. |
| func (db *Database) RootEntity() *Entity { |
| return &Entity{ |
| id: "0", |
| } |
| } |
| |
| // Get returns the entity for a given path. |
| func (db *Database) Get(name string) *Entity { |
| db.mux.RLock() |
| defer db.mux.RUnlock() |
| |
| e, err := db.get(name) |
| if err != nil { |
| return nil |
| } |
| return e |
| } |
| |
| func (db *Database) get(name string) (*Entity, error) { |
| e := db.RootEntity() |
| // We always know the root name so return it if |
| // it is requested |
| if name == "/" { |
| return e, nil |
| } |
| |
| parts := split(name) |
| for i := 1; i < len(parts); i++ { |
| p := parts[i] |
| if p == "" { |
| continue |
| } |
| |
| next := db.child(e, p) |
| if next == nil { |
| return nil, fmt.Errorf("Cannot find child for %s", name) |
| } |
| e = next |
| } |
| return e, nil |
| |
| } |
| |
| // List all entities by from the name. |
| // The key will be the full path of the entity. |
| func (db *Database) List(name string, depth int) Entities { |
| db.mux.RLock() |
| defer db.mux.RUnlock() |
| |
| out := Entities{} |
| e, err := db.get(name) |
| if err != nil { |
| return out |
| } |
| |
| children, err := db.children(e, name, depth, nil) |
| if err != nil { |
| return out |
| } |
| |
| for _, c := range children { |
| out[c.FullPath] = c.Entity |
| } |
| return out |
| } |
| |
| // Walk through the child graph of an entity, calling walkFunc for each child entity. |
| // It is safe for walkFunc to call graph functions. |
| func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { |
| children, err := db.Children(name, depth) |
| if err != nil { |
| return err |
| } |
| |
| // Note: the database lock must not be held while calling walkFunc |
| for _, c := range children { |
| if err := walkFunc(c.FullPath, c.Entity); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // Children returns the children of the specified entity. |
| func (db *Database) Children(name string, depth int) ([]WalkMeta, error) { |
| db.mux.RLock() |
| defer db.mux.RUnlock() |
| |
| e, err := db.get(name) |
| if err != nil { |
| return nil, err |
| } |
| |
| return db.children(e, name, depth, nil) |
| } |
| |
| // Parents returns the parents of a specified entity. |
| func (db *Database) Parents(name string) ([]string, error) { |
| db.mux.RLock() |
| defer db.mux.RUnlock() |
| |
| e, err := db.get(name) |
| if err != nil { |
| return nil, err |
| } |
| return db.parents(e) |
| } |
| |
| // Refs returns the reference count for a specified id. |
| func (db *Database) Refs(id string) int { |
| db.mux.RLock() |
| defer db.mux.RUnlock() |
| |
| var count int |
| if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil { |
| return 0 |
| } |
| return count |
| } |
| |
| // RefPaths returns all the id's path references. |
| func (db *Database) RefPaths(id string) Edges { |
| db.mux.RLock() |
| defer db.mux.RUnlock() |
| |
| refs := Edges{} |
| |
| rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id) |
| if err != nil { |
| return refs |
| } |
| defer rows.Close() |
| |
| for rows.Next() { |
| var name string |
| var parentID string |
| if err := rows.Scan(&name, &parentID); err != nil { |
| return refs |
| } |
| refs = append(refs, &Edge{ |
| EntityID: id, |
| Name: name, |
| ParentID: parentID, |
| }) |
| } |
| return refs |
| } |
| |
| // Delete the reference to an entity at a given path. |
| func (db *Database) Delete(name string) error { |
| db.mux.Lock() |
| defer db.mux.Unlock() |
| |
| if name == "/" { |
| return fmt.Errorf("Cannot delete root entity") |
| } |
| |
| parentPath, n := splitPath(name) |
| parent, err := db.get(parentPath) |
| if err != nil { |
| return err |
| } |
| |
| if _, err := db.conn.Exec("DELETE FROM edge WHERE parent_id = ? AND name = ?;", parent.id, n); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // Purge removes the entity with the specified id |
| // Walk the graph to make sure all references to the entity |
| // are removed and return the number of references removed |
| func (db *Database) Purge(id string) (int, error) { |
| db.mux.Lock() |
| defer db.mux.Unlock() |
| |
| tx, err := db.conn.Begin() |
| if err != nil { |
| return -1, err |
| } |
| |
| // Delete all edges |
| rows, err := tx.Exec("DELETE FROM edge WHERE entity_id = ?;", id) |
| if err != nil { |
| tx.Rollback() |
| return -1, err |
| } |
| changes, err := rows.RowsAffected() |
| if err != nil { |
| return -1, err |
| } |
| |
| // Clear who's using this id as parent |
| refs, err := tx.Exec("DELETE FROM edge WHERE parent_id = ?;", id) |
| if err != nil { |
| tx.Rollback() |
| return -1, err |
| } |
| refsCount, err := refs.RowsAffected() |
| if err != nil { |
| return -1, err |
| } |
| |
| // Delete entity |
| if _, err := tx.Exec("DELETE FROM entity where id = ?;", id); err != nil { |
| tx.Rollback() |
| return -1, err |
| } |
| |
| if err := tx.Commit(); err != nil { |
| return -1, err |
| } |
| |
| return int(changes + refsCount), nil |
| } |
| |
| // Rename an edge for a given path |
| func (db *Database) Rename(currentName, newName string) error { |
| db.mux.Lock() |
| defer db.mux.Unlock() |
| |
| parentPath, name := splitPath(currentName) |
| newParentPath, newEdgeName := splitPath(newName) |
| |
| if parentPath != newParentPath { |
| return fmt.Errorf("Cannot rename when root paths do not match %s != %s", parentPath, newParentPath) |
| } |
| |
| parent, err := db.get(parentPath) |
| if err != nil { |
| return err |
| } |
| |
| rows, err := db.conn.Exec("UPDATE edge SET name = ? WHERE parent_id = ? AND name = ?;", newEdgeName, parent.id, name) |
| if err != nil { |
| return err |
| } |
| i, err := rows.RowsAffected() |
| if err != nil { |
| return err |
| } |
| if i == 0 { |
| return fmt.Errorf("Cannot locate edge for %s %s", parent.id, name) |
| } |
| return nil |
| } |
| |
| // WalkMeta stores the walk metadata. |
| type WalkMeta struct { |
| Parent *Entity |
| Entity *Entity |
| FullPath string |
| Edge *Edge |
| } |
| |
| func (db *Database) children(e *Entity, name string, depth int, entities []WalkMeta) ([]WalkMeta, error) { |
| if e == nil { |
| return entities, nil |
| } |
| |
| rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id) |
| if err != nil { |
| return nil, err |
| } |
| defer rows.Close() |
| |
| for rows.Next() { |
| var entityID, entityName string |
| if err := rows.Scan(&entityID, &entityName); err != nil { |
| return nil, err |
| } |
| child := &Entity{entityID} |
| edge := &Edge{ |
| ParentID: e.id, |
| Name: entityName, |
| EntityID: child.id, |
| } |
| |
| meta := WalkMeta{ |
| Parent: e, |
| Entity: child, |
| FullPath: path.Join(name, edge.Name), |
| Edge: edge, |
| } |
| |
| entities = append(entities, meta) |
| |
| if depth != 0 { |
| nDepth := depth |
| if depth != -1 { |
| nDepth-- |
| } |
| entities, err = db.children(child, meta.FullPath, nDepth, entities) |
| if err != nil { |
| return nil, err |
| } |
| } |
| } |
| |
| return entities, nil |
| } |
| |
| func (db *Database) parents(e *Entity) (parents []string, err error) { |
| if e == nil { |
| return parents, nil |
| } |
| |
| rows, err := db.conn.Query("SELECT parent_id FROM edge where entity_id = ?;", e.id) |
| if err != nil { |
| return nil, err |
| } |
| defer rows.Close() |
| |
| for rows.Next() { |
| var parentID string |
| if err := rows.Scan(&parentID); err != nil { |
| return nil, err |
| } |
| parents = append(parents, parentID) |
| } |
| |
| return parents, nil |
| } |
| |
| // Return the entity based on the parent path and name. |
| func (db *Database) child(parent *Entity, name string) *Entity { |
| var id string |
| if err := db.conn.QueryRow("SELECT entity_id FROM edge WHERE parent_id = ? AND name = ?;", parent.id, name).Scan(&id); err != nil { |
| return nil |
| } |
| return &Entity{id} |
| } |
| |
| // ID returns the id used to reference this entity. |
| func (e *Entity) ID() string { |
| return e.id |
| } |
| |
| // Paths returns the paths sorted by depth. |
| func (e Entities) Paths() []string { |
| out := make([]string, len(e)) |
| var i int |
| for k := range e { |
| out[i] = k |
| i++ |
| } |
| sortByDepth(out) |
| |
| return out |
| } |