add observer implementation

This commit is contained in:
Mya 2022-12-20 00:41:28 -06:00
parent 0dd41e0c37
commit 2653a91747
No known key found for this signature in database
GPG Key ID: C3ECFA648DAD27FA
7 changed files with 190 additions and 41 deletions

View File

@ -29,6 +29,7 @@ import (
"sync" "sync"
"time" "time"
"go.pitz.tech/okit/observer"
"go.pitz.tech/okit/pb" "go.pitz.tech/okit/pb"
) )
@ -49,6 +50,7 @@ var (
// NewClient produces a new default client implementation for emitting metrics. // NewClient produces a new default client implementation for emitting metrics.
func NewClient() Client { func NewClient() Client {
return Client{ return Client{
observer: observer.Local(),
level: LevelInfo, level: LevelInfo,
now: time.Now, now: time.Now,
uuid: defaultUUID, uuid: defaultUUID,
@ -58,6 +60,7 @@ func NewClient() Client {
// Client provides a default implementation. // Client provides a default implementation.
type Client struct { type Client struct {
observer observer.Observer
level Level level Level
now func() time.Time now func() time.Time
uuid func() string uuid func() string
@ -65,6 +68,12 @@ type Client struct {
callerSkip int callerSkip int
} }
// WithObservers configures the new client with the provided observers.
func (c Client) WithObservers(observers ...observer.Observer) Client {
c.observer = observer.Composite(observers)
return c
}
// WithLevel allows the current logging level to be tuned. This does not prevent traces from being emit, or metrics // WithLevel allows the current logging level to be tuned. This does not prevent traces from being emit, or metrics
// from being reported. It only modifies the behavior of logs. // from being reported. It only modifies the behavior of logs.
func (c Client) WithLevel(level Level) Client { func (c Client) WithLevel(level Level) Client {
@ -264,11 +273,9 @@ func (c Client) emit(entries ...*pb.Entry) {
for _, entry := range entries { for _, entry := range entries {
entry.Timestamp = now entry.Timestamp = now
entry.Tags = tags entry.Tags = tags
_ = DefaultFormat.Marshal(sink, entry)
} }
_ = sink.Flush() c.observer.Observe(entries)
} }
// caller attempts to get method caller information. This information is used for tracing information across an // caller attempts to get method caller information. This information is used for tracing information across an

View File

@ -25,11 +25,20 @@ import (
"go.pitz.tech/okit" "go.pitz.tech/okit"
okithttp "go.pitz.tech/okit/http" okithttp "go.pitz.tech/okit/http"
"go.pitz.tech/okit/observer"
) )
func main() { func main() {
// Enable JSON output client := okit.NewClient().
okit.DefaultFormat = okit.JSONFormat{} WithObservers(
observer.Local(
observer.Format("json"),
),
// TODO: add remote observers
)
okit.Replace(client)
// Instrument HTTP Clients // Instrument HTTP Clients
okithttp.InstrumentClient(http.DefaultClient) okithttp.InstrumentClient(http.DefaultClient)

View File

@ -27,7 +27,8 @@ import (
) )
func main() { func main() {
okit.DefaultClient = okit.DefaultClient.WithLevel(okit.LevelTrace) client := okit.NewClient().WithLevel(okit.LevelTrace)
okit.Replace(client)
var tags []okit.Tag var tags []okit.Tag

View File

@ -18,7 +18,7 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
// OR OTHER DEALINGS IN THE SOFTWARE. // OR OTHER DEALINGS IN THE SOFTWARE.
package okit package format
import ( import (
"bufio" "bufio"
@ -29,33 +29,33 @@ import (
"go.pitz.tech/okit/pb" "go.pitz.tech/okit/pb"
) )
// Format defines an abstraction for writing entries to stdout/stderr. This is predominantly used in report log, trace, // Marshaler defines an abstraction for writing entries to stdout/stderr. This is predominantly used in report log, trace,
// and event information from the process. // and event information from the process.
type Format interface { type Marshaler interface {
Marshal(writer *bufio.Writer, entry *pb.Entry) error Marshal(writer *bufio.Writer, entry *pb.Entry) error
} }
// JSONFormat writes entries using JSON encoded protocol buffers. // JSON writes entries using JSON encoded protocol buffers.
type JSONFormat struct { type JSON struct {
// Marshaler
Marshaler jsonpb.Marshaler Marshaler jsonpb.Marshaler
} }
func (f JSONFormat) Marshal(writer *bufio.Writer, entry *pb.Entry) error { func (f JSON) Marshal(writer *bufio.Writer, entry *pb.Entry) error {
f.Marshaler.Marshal(writer, entry) f.Marshaler.Marshal(writer, entry)
return writer.WriteByte('\n') return writer.WriteByte('\n')
} }
// TextFormat writes entries using a custom text format. // Text writes entries using a custom text format. Entries are written using a TSV format and tags are url-encoded
type TextFormat struct{} // key-value pairs.
type Text struct{}
func (f TextFormat) Marshal(writer *bufio.Writer, entry *pb.Entry) error { func (f Text) Marshal(writer *bufio.Writer, entry *pb.Entry) error {
sink.WriteString(entry.Timestamp.AsTime().Format(time.RFC3339)) writer.WriteString(entry.Timestamp.AsTime().Format(time.RFC3339))
sink.WriteByte('\t') writer.WriteByte('\t')
sink.WriteString(strings.ToUpper(entry.Kind.String())) writer.WriteString(strings.ToUpper(entry.Kind.String()))
sink.WriteByte('\t') writer.WriteByte('\t')
sink.WriteString(entry.Scope) writer.WriteString(entry.Scope)
sink.WriteByte('\t') writer.WriteByte('\t')
for _, tag := range entry.Tags { for _, tag := range entry.Tags {
qs := tag.AsQueryString() qs := tag.AsQueryString()
@ -63,9 +63,9 @@ func (f TextFormat) Marshal(writer *bufio.Writer, entry *pb.Entry) error {
continue continue
} }
sink.WriteString(qs) writer.WriteString(qs)
sink.WriteByte('&') writer.WriteByte('&')
} }
return sink.WriteByte('\n') return writer.WriteByte('\n')
} }

81
observer/local.go Normal file
View File

@ -0,0 +1,81 @@
// Copyright (C) 2022 The OKit Authors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
// OR OTHER DEALINGS IN THE SOFTWARE.
package observer
import (
"bufio"
"io"
"os"
"go.pitz.tech/okit/format"
"go.pitz.tech/okit/pb"
)
// LocalOption defines a mechanism that allows properties of the observer to be overridden or configured.
type LocalOption func(l *local)
// Output configures the output of the local observer to point to the provided writer.
func Output(writer io.Writer) LocalOption {
return func(l *local) {
l.output = bufio.NewWriter(writer)
}
}
// Format configures how the information is written to the target writer.
func Format(fmt string) LocalOption {
return func(l *local) {
switch fmt {
case "json":
l.format = format.JSON{}
default:
l.format = format.Text{}
}
}
}
// Local creates an Observer that writes logs to an output stream using a configured format.
func Local(opts ...LocalOption) Observer {
l := &local{
output: bufio.NewWriter(os.Stdout),
format: format.Text{},
}
for _, o := range opts {
o(l)
}
return l
}
type local struct {
output *bufio.Writer
format format.Marshaler
}
func (l *local) Observe(entries []*pb.Entry) {
// TODO: how to handle internal errors
for _, entry := range entries {
_ = l.format.Marshal(l.output, entry)
}
_ = l.output.Flush()
}

39
observer/observer.go Normal file
View File

@ -0,0 +1,39 @@
// Copyright (C) 2022 The OKit Authors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
// OR OTHER DEALINGS IN THE SOFTWARE.
package observer
import (
"go.pitz.tech/okit/pb"
)
// Observer provides a mechanism to subscribe to entries within the okit system.
type Observer interface {
Observe(entries []*pb.Entry)
}
// Composite provides a way for multiple observers to be plugged into the system.
type Composite []Observer
func (c Composite) Observe(entries []*pb.Entry) {
for _, observer := range c {
observer.Observe(entries)
}
}

44
okit.go
View File

@ -21,19 +21,31 @@
package okit package okit
import ( import (
"bufio"
"context" "context"
"os"
) )
// DefaultClient provides a default client implementation. var (
var DefaultClient = NewClient() global chan Client
)
// DefaultFormat is used to format how data is written to stdout. func init() {
var DefaultFormat Format = TextFormat{} global = make(chan Client, 1)
global <- NewClient()
}
// TODO: make this configurable // Replace updates the default Client to be the provided one.
var sink = bufio.NewWriter(os.Stdout) func Replace(c Client) {
<-global
global <- c
}
// C returns a copy of the current default Client.
func C() Client {
c := <-global
global <- c
return c
}
// ContextKey is a holder structure that allows data to be attached to contexts. // ContextKey is a holder structure that allows data to be attached to contexts.
type ContextKey string type ContextKey string
@ -43,40 +55,40 @@ var SpanKey = ContextKey("okit.span")
// With returns a client containing additional tags. // With returns a client containing additional tags.
func With(tags ...Tag) Client { func With(tags ...Tag) Client {
return DefaultClient.WithCallerSkip(3).With(tags...) return C().With(tags...)
} }
// Emit emits an event with the provided tags. // Emit emits an event with the provided tags.
func Emit(event string, tags ...Tag) { func Emit(event string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Emit(event, tags...) C().WithCallerSkip(3).Emit(event, tags...)
} }
// Observe reports a metric value with the provided tags. // Observe reports a metric value with the provided tags.
func Observe(metric string, value float64, tags ...Tag) { func Observe(metric string, value float64, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Observe(metric, value, tags...) C().WithCallerSkip(3).Observe(metric, value, tags...)
} }
// Trace traces a function call. It implements distributed tracing and bookend events. // Trace traces a function call. It implements distributed tracing and bookend events.
func Trace(ctxp *context.Context, tags ...Tag) ActiveTrace { func Trace(ctxp *context.Context, tags ...Tag) ActiveTrace {
return DefaultClient.WithCallerSkip(3).Trace(ctxp, tags...) return C().WithCallerSkip(3).Trace(ctxp, tags...)
} }
// Debug writes a message at a debug level. // Debug writes a message at a debug level.
func Debug(msg string, tags ...Tag) { func Debug(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Debug(msg, tags...) C().WithCallerSkip(3).Debug(msg, tags...)
} }
// Info writes a message at a informational level. // Info writes a message at a informational level.
func Info(msg string, tags ...Tag) { func Info(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Info(msg, tags...) C().WithCallerSkip(3).Info(msg, tags...)
} }
// Warn writes a message at a warning level. // Warn writes a message at a warning level.
func Warn(msg string, tags ...Tag) { func Warn(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Warn(msg, tags...) C().WithCallerSkip(3).Warn(msg, tags...)
} }
// Error writes a message at a error level. // Error writes a message at a error level.
func Error(msg string, tags ...Tag) { func Error(msg string, tags ...Tag) {
DefaultClient.WithCallerSkip(3).Error(msg, tags...) C().WithCallerSkip(3).Error(msg, tags...)
} }